Reputation: 19240
I am having 2 datasets:
Id, Name
1, Jack
2, Jill
3, James
Id, Activity, UserId
101, Activity 1, 1
102, Activity 2, 1
201, Activity 3, 2
301, Activity 4, 3
How can I use PySpark to add a column named "Activities" to the User dataset, which groups all activities related to an user in JSON format. The expecting output is:
Id, Name, Activities
1, Jack, [{Id: 101, Name: Activity 1}, {Id: 102, Name: Activity 2}]
2, Jill, [{Id: 201, Name: Activity 3}]
3, James, [{Id: 301: Name: Activity 4}]
Upvotes: 0
Views: 619
Reputation: 2705
Let's assume you have two dataframe dfUser and dfActivities
joinDf= dfUser.join(dfActivities, col('Id')==col('UserId'))
.withColumnRenamed(dfActivities['Id'], "aId")
.groupBy(col("Id"))
.agg(collect_list("aId","Activity").alias("Activities"))
Upvotes: 1
Reputation: 3619
Having non-json and json data together can be little bit tricky. The solution below creates JSON structure for all columns including ID and Name, so its close approximation of final result ..
first , Lets create sample data -
list1 = [1,"Jack"],[2,"Jill"],[3,"James"]
df1=spark.createDataFrame(list1,schema=["id","Name"])
list2= [101,"Activity1",1],[101,"Activity2",1],[201,"Activity3",2],[301,"Activity4",3]
df2=spark.createDataFrame(list2,schema=['Id','Activity','UserId'])
then register both dataframes as temporary tables, so we can execute sql on it to format the data the way we want -
df1.registerTempTable("table1")
df2.registerTempTable("table2")
Then run a sql which uses combination of collect_list
and named_struct
to closely match your final structural requirements
df3= spark.sql("""
WITH tmp
AS (SELECT t1.id,
Collect_list(Named_struct("id", t2.id, "name", t2.activity)) AS
Activities
FROM table1 t1
JOIN table2 t2
ON ( t1.id = t2.userid )
GROUP BY t1.id)
SELECT tmp.id,
t3.NAME,
tmp.activities
FROM tmp
JOIN table1 t3
ON ( tmp.id = t3.id )
""")
df3.toJSON().collect()
this gives me result as -
['{"id":1,"NAME":"Jack","activities":[{"id":101,"name":"Activity1"},{"id":101,"name":"Activity2"}]}',
'{"id":3,"NAME":"James","activities":[{"id":301,"name":"Activity4"}]}',
'{"id":2,"NAME":"Jill","activities":[{"id":201,"name":"Activity3"}]}']
if I remove toJSON()
transformations and simply show result, it shows as
df3.show()
+---+-----+-----------------------------------+
| id| NAME| activities |
+---+-----+-----------------------------------+
| 1| Jack|[[101, Activity1],[101, Activity2]]|
| 3|James| [[301, Activity4]] |
| 2| Jill| [[201, Activity3]] |
+---+-----+-----------------------------------+
Upvotes: 2