SuFi
SuFi

Reputation: 385

Pyspark grouping , joining and covert to json

I Have two spark Df.

  1. User Df
    • userId
    • UserName
    • Address
  2. Order Df
    • UserId
    • ProductName
    • ProductDesc
    • CategoryName
    • CategoryId
    • CategoryDesc
    • Price

Sample data: User Df

+------+----+--------+
|userId|name|Addreshh|
+------+----+--------+
|     1|Sufi|   Reons|
|     2|Ragu|  Random|
+------+----+--------+

Order df

+------+-----------+-----------+------------+----------+------------+-----+
|userId|ProductName|ProductDesc|CategoryName|CategoryId|CategoryDesc|Price|
+------+-----------+-----------+------------+----------+------------+-----+
|     1|         A1|      A1Dec|           A|         1|        Adec|    5|
|     1|         A2|      A2Dec|           A|         1|        Adec|   10|
|     1|         B1|      A1Dec|           B|         2|        Bdec|   11|
|     2|         B4|      A4Dec|           B|         2|        Bdec|   15|
+------+-----------+-----------+------------+----------+------------+-----+

I need to group and aggregate(create nested schema) order df and join with user df. Then create a json file for each record

eg:- Json 1

{
      "userId": 1,
      "neme": "sufi",
      "address": "Reons",
      "order": [
        {
          "name": "A1",
          "price": 5,
          "category": {
            "Id": 1,
            "name": "A",
            "desc": "ADesc"
          }
        },
        {
          "name": "A2",
          "price": 10,
          "category": {
            "Id": 1,
            "name": "A",
            "desc": "ADesc"
          }
        },
        {
          "name": "B1",
          "price": 11,
          "category": {
            "Id": 2,
            "name": "B",
            "desc": "BDesc"
          }
        }
      ]
    }

Upvotes: 0

Views: 57

Answers (2)

mck
mck

Reputation: 42342

Join the two dataframes and use collect_list to collect the orders for each user. Write json files as output and partition it using userId. There will be two folders created for each userId, and each folder will contain one json file. Spark can't rename the files or move them, so you'll probably need some os operations to rename/move them as you wish.

import pyspark.sql.functions as F

orderdf2 = orderdf.select('userId',
    F.struct(
        F.col('ProductName').alias('name'),
        F.col('Price').alias('price'),
        F.struct(
            F.col('CategoryId').alias('Id'),
            F.col('CategoryName').alias('name'),
            F.col('CategoryDesc').alias('desc')
        ).alias('category')
    ).alias('order')
).groupBy('userId').agg(
    F.collect_list('order').alias('order')
)

userdf.join(
    orderdf2, 'userId'
).groupBy(
    'userId','name','address'
).agg(
    F.collect_list('order').alias('order')
).write.partitionBy('userId').json('result')
==> userId=1/part-00144-845806db-0700-4585-bb45-01648432abc1.c000.json <==
{"name":"Sufi","address":"Reons","order":[{"name":"A1","price":5,"category":{"Id":"1","name":"A","desc":"Adec"}},{"name":"A2","price":10,"category":{"Id":"1","name":"A","desc":"Adec"}},{"name":"B1","price":11,"category":{"Id":"2","name":"B","desc":"Bdec"}}]}

==> userId=2/part-00189-845806db-0700-4585-bb45-01648432abc1.c000.json <==
{"name":"Ragu","address":"Random","order":[{"name":"B4","price":15,"category":{"Id":"2","name":"B","desc":"Bdec"}}]}

Upvotes: 1

stack0114106
stack0114106

Reputation: 8711

Spark-sql solution:

 val df = spark.sql(""" with t1 (
 select  1 c1,   'Sufi' c2, 'Reons' c3  union all
 select  2 c1,   'Ragu' c2, 'Random' c3
  )  select   c1  userId,   c2  name,   c3 Addreshh    from t1
""")

 val order_df = spark.sql(""" with t1 (
 select  1 c1,   'A1' c2, 'A1Dec' c3, 'A' c4, 1 c5,   'Adec' c6, 5 c7    union all
 select  1 c1,   'A2' c2, 'A2Dec' c3, 'A' c4, 1 c5,   'Adec' c6, 10 c7    union all
 select  1 c1,   'B1' c2, 'A1Dec' c3, 'B' c4, 2 c5,   'Bdec' c6, 11 c7    union all
 select  2 c1,   'B4' c2, 'A4Dec' c3, 'B' c4, 2 c5,   'Bdec' c6, 15 c7
  )  select   c1  userId,   c2  ProductName,   c3  ProductDesc,   c4  CategoryName,   c5  CategoryId,   c6  CategoryDesc,   c7 Price    from t1
""")

df.createOrReplaceTempView("cust")
order_df.createOrReplaceTempView("order")

val dj_src1 = spark.sql(""" select userId, collect_list(named_struct('name',ProductName,'price',Price,'category',category )) order from 
( select userId, ProductName, Price, named_struct('id', CategoryId,'name',CategoryName,'desc', CategoryDesc ) category from order ) temp
group by 1  
""")

dj_src1.createOrReplaceTempView("src1")

val dj2 = spark.sql(""" select a.userId, a.name, a.Addreshh, b.order 
from cust a join 
src1 b on
a.userId=b.userid
""")

dj2.toJSON.show(false)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                   |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"userId":1,"name":"Sufi","Addreshh":"Reons","order":[{"name":"A1","price":5,"category":{"id":1,"name":"A","desc":"Adec"}},{"name":"A2","price":10,"category":{"id":1,"name":"A","desc":"Adec"}},{"name":"B1","price":11,"category":{"id":2,"name":"B","desc":"Bdec"}}]}|
|{"userId":2,"name":"Ragu","Addreshh":"Random","order":[{"name":"B4","price":15,"category":{"id":2,"name":"B","desc":"Bdec"}}]}                                                                                                                                          |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Upvotes: 0

Related Questions