Reputation: 1161
I have a dataframe with the following columns - User, Order, Food.
For example:
df = spark.createDataFrame(pd.DataFrame([['A','B','A','C','A'],[1,1,2,1,3],['Eggs','Salad','Peaches','Bread','Water']],index=['User','Order','Food']).T)
I would like to concatenate all of the foods into a single string sorted by order and grouped by per user
If I run the following:
df.groupBy("User").agg(concat_ws(" $ ",collect_list("Food")).alias("Food List"))
I get a single list but the foods are not concatenated in order.
User Food List
B Salad
C Bread
A Eggs $ Water $ Peaches
What is a good way to get the food list concatenated in order?
Upvotes: 2
Views: 4963
Reputation: 1161
Based on the possible duplicate comment - collect_list by preserving order based on another variable, I was able to come up with a solution.
First define a sorter function. This takes a struct, sorts by order and then returns the list of items in a string format separated by ' $ '
# define udf
def sorter(l):
res = sorted(l, key=lambda x: x.Order)
return ' $ '.join([item[1] for item in res])
sort_udf = udf(sorter,StringType())
Then create the struct and run the sorter function:
SortedFoodList = (df.groupBy("User")
.agg(collect_list(struct("Order","Food")).alias("food_list"))
.withColumn("sorted_foods",sort_udf("food_list"))
.drop("food_list")
)
Upvotes: 2
Reputation: 1669
Try use window
here:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import mean, pandas_udf, PandasUDFType
from pyspark.sql.types import *
df = spark.createDataFrame(pd.DataFrame([['A','B','A','C','A'],[1,1,2,1,3],['Eggs','Salad','Peaches','Bread','Water']],index=['User','Order','Food']).T)
df.show()
+----+-----+-------+
|User|Order| Food|
+----+-----+-------+
| A| 1| Eggs|
| B| 1| Salad|
| A| 2|Peaches|
| C| 1| Bread|
| A| 3| Water|
+----+-----+-------+
udf
to join the strings:w = Window.partitionBy('User').orderBy('Order').rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
@pandas_udf(StringType(), PandasUDFType.GROUPED_AGG)
def _udf(v):
return ' $ '.join(v)
df = df.withColumn('Food List', _udf(df['Food']).over(w)).dropDuplicates(['User', 'Food List']).drop(*['Order', 'Food'])
df.show(truncate=False)
+----+----------------------+
|User|Food List |
+----+----------------------+
|B |Salad |
|C |Bread |
|A |Eggs $ Peaches $ Water|
+----+----------------------+
Upvotes: 2