question1234
question1234

Reputation: 25

Pyspark: How to set the same id to all the rows that have the same value in another column?

I have a dataset like this one:

  +----------+------------+
  |id        |event       |
  +----------+------------+
  | 1        |A           |
  | 2        |B           |
  | 3        |C           |
  | 4        |C           |
  | 5        |A           |
  | 6        |D           |
  | 7        |B           |
  +----------+------------+

And I would like either to modify id or add another column where all the equal values in column "event" have the same id. And I would like the rows to remain in the same order as they are now.

This is how I would like the data to look at the end (the value of "id" doesn't matter as long as it's unique for each event)

  +----------+------------+
  |id        |event       |
  +----------+------------+
  | 1        |A           |
  | 2        |B           |
  | 3        |C           |
  | 3        |C           |
  | 1        |A           |
  | 4        |D           |
  | 2        |B           |
  +----------+------------+

Upvotes: 2

Views: 2254

Answers (2)

nick
nick

Reputation: 41

Just group your original dataframe by column event and aggregate like max(col('id')), and you will get a new dataframe like:

  +----------+------------+
  |event     |maxid       |
  +----------+------------+
  | A        |5           |
  | B        |7           |
  | C        |4           |
  | D        |6           |
  +----------+------------+

The next step is to join this new dataframe with you original dataframe (on column event), and the column maxid is what you want.

Upvotes: 0

Kafels
Kafels

Reputation: 4059

UPDATE

Adding monotonically_increasing_id() to see your data in the original input after setting an id:

The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records.

output_df = (input_df
             .withColumn('order', f.monotonically_increasing_id())
             .withColumn('id', f.first('id').over(Window.partitionBy('event'))))
output_df.sort('order').show()

+---+-----+-----------+
| id|event|      order|
+---+-----+-----------+
|  1|    A| 8589934592|
|  2|    B|17179869184|
|  3|    C|25769803776|
|  3|    C|34359738368|
|  1|    A|42949672960|
|  6|    D|51539607552|
|  2|    B|60129542144|
+---+-----+-----------+

OLD

To "preserve" the dataframe order, create another column and keep id intact to sort whenever you want:

from pyspark.sql import Window
import pyspark.sql.functions as f

input_df = spark.createDataFrame([
  [1, 'A'],
  [2, 'B'],
  [3, 'C'],
  [4, 'C'],
  [5, 'A'],
  [6, 'D'],
  [7, 'B']
], ['id', 'event'])

output_df = input_df.withColumn('group_id', f.first('id').over(Window.partitionBy('event')))
output_df.sort('id').show()

+---+-----+--------+
| id|event|group_id|
+---+-----+--------+
|  1|    A|       1|
|  2|    B|       2|
|  3|    C|       3|
|  4|    C|       3|
|  5|    A|       1|
|  6|    D|       6|
|  7|    B|       2|
+---+-----+--------+

Upvotes: 2

Related Questions