Ajay
Ajay

Reputation: 198

Spark - Maintaing order of data across columns during groupby and collect

If I have

ID  Name     Code    Value
1   Person1  A       12
1   Person2  B       15

And I do a

df.groupBy("ID").agg(
collect_set("Name").alias("Name"),
collect_set("Code").alias("Code"),
collect_set("Value").alias("Value")
)

I might get a

1, [Person1, Person2], [B,A], [15,12]

I need to get a

1, [Person1, Person2], [A,B], [12,15]

How do I ensure the same order for all columns ?

My actual df has 70 columns, I need to group by one columns and pick the first 5 unique values for each column in the correct order

Any suggestions are deeply appreciated

Upvotes: 1

Views: 92

Answers (2)

ForeverLearner
ForeverLearner

Reputation: 2113

Sets do not preserve the order. But you could trick the resulting array after collect_list by using some satellite data.

You could prepend Person name to the attributes as below:

val df2 = df.map(each => {
      val person = each.getString(1)
     (each.getInt(0), person + "|" + each.getString(1), person + "|" + each.getString(2), person + "|" + each.getInt(3))
     }).toDF("ID","Name","Code","Value")

Now, you could use sort_array after the collect_list, and it will sort all attributes as per Name of the person

val df3 = df2.groupBy("ID").agg( 
sort_array(collect_set("Name")).alias("Name"),
sort_array(collect_set("Code")).alias("Code"),
sort_array(collect_set("Value")).alias("Value"))

Note that each attribute has the person info attached in the resulting list.

df3.show
+---+--------------------+--------------------+--------------------+
| ID|                Name|                Code|               Value|
+---+--------------------+--------------------+--------------------+
|  1|[Person1|Person1,...|[Person1|A, Perso...|[Person1|12, Pers...|
+---+--------------------+--------------------+--------------------+

Upvotes: 0

Raphael Roth
Raphael Roth

Reputation: 27373

You cannot be sure about the order in your sets, I would suggest to pack the attributes in a struct, this will give you 1 array instead of 3.

df.groupBy("ID").agg(
  collect_list(struct("Name","Code","Value").as("Attribute")).as("Attributes")
)

Upvotes: 3

Related Questions