Tuong Le
Tuong Le

Reputation: 19240

Group children objects to JSON array

I am having 2 datasets:

  1. User
Id, Name
1, Jack
2, Jill
3, James
  1. Activities
Id, Activity, UserId
101, Activity 1, 1
102, Activity 2, 1
201, Activity 3, 2
301, Activity 4, 3

How can I use PySpark to add a column named "Activities" to the User dataset, which groups all activities related to an user in JSON format. The expecting output is:

Id, Name, Activities
1, Jack, [{Id: 101, Name: Activity 1}, {Id: 102, Name: Activity 2}]
2, Jill, [{Id: 201, Name: Activity 3}]
3, James, [{Id: 301: Name: Activity 4}]

Upvotes: 0

Views: 619

Answers (2)

howie
howie

Reputation: 2705

Let's assume you have two dataframe dfUser and dfActivities


joinDf=   dfUser.join(dfActivities, col('Id')==col('UserId'))
                .withColumnRenamed(dfActivities['Id'], "aId") 
                .groupBy(col("Id"))
                .agg(collect_list("aId","Activity").alias("Activities"))

Upvotes: 1

Pushkr
Pushkr

Reputation: 3619

Having non-json and json data together can be little bit tricky. The solution below creates JSON structure for all columns including ID and Name, so its close approximation of final result ..

first , Lets create sample data -

list1 = [1,"Jack"],[2,"Jill"],[3,"James"]
df1=spark.createDataFrame(list1,schema=["id","Name"])

list2= [101,"Activity1",1],[101,"Activity2",1],[201,"Activity3",2],[301,"Activity4",3]
df2=spark.createDataFrame(list2,schema=['Id','Activity','UserId'])

then register both dataframes as temporary tables, so we can execute sql on it to format the data the way we want -

df1.registerTempTable("table1")
df2.registerTempTable("table2")

Then run a sql which uses combination of collect_list and named_struct to closely match your final structural requirements

df3= spark.sql("""
    WITH tmp 
     AS (SELECT t1.id, 
                Collect_list(Named_struct("id", t2.id, "name", t2.activity)) AS 
                   Activities 
         FROM   table1 t1 
                JOIN table2 t2 
                  ON ( t1.id = t2.userid ) 
         GROUP  BY t1.id) 
    SELECT tmp.id, 
           t3.NAME, 
           tmp.activities 
    FROM   tmp 
           JOIN table1 t3 
             ON ( tmp.id = t3.id ) 
        """)

df3.toJSON().collect()

this gives me result as -

['{"id":1,"NAME":"Jack","activities":[{"id":101,"name":"Activity1"},{"id":101,"name":"Activity2"}]}',
 '{"id":3,"NAME":"James","activities":[{"id":301,"name":"Activity4"}]}',
 '{"id":2,"NAME":"Jill","activities":[{"id":201,"name":"Activity3"}]}']

if I remove toJSON() transformations and simply show result, it shows as

 df3.show() 


+---+-----+-----------------------------------+
| id| NAME|          activities               |
+---+-----+-----------------------------------+
|  1| Jack|[[101, Activity1],[101, Activity2]]|
|  3|James|  [[301, Activity4]]               |
|  2| Jill|  [[201, Activity3]]               |
+---+-----+-----------------------------------+

Upvotes: 2

Related Questions