Reputation: 385
I Have two spark Df.
Sample data: User Df
+------+----+--------+
|userId|name|Addreshh|
+------+----+--------+
| 1|Sufi| Reons|
| 2|Ragu| Random|
+------+----+--------+
Order df
+------+-----------+-----------+------------+----------+------------+-----+
|userId|ProductName|ProductDesc|CategoryName|CategoryId|CategoryDesc|Price|
+------+-----------+-----------+------------+----------+------------+-----+
| 1| A1| A1Dec| A| 1| Adec| 5|
| 1| A2| A2Dec| A| 1| Adec| 10|
| 1| B1| A1Dec| B| 2| Bdec| 11|
| 2| B4| A4Dec| B| 2| Bdec| 15|
+------+-----------+-----------+------------+----------+------------+-----+
I need to group and aggregate(create nested schema) order df and join with user df. Then create a json file for each record
eg:- Json 1
{
"userId": 1,
"neme": "sufi",
"address": "Reons",
"order": [
{
"name": "A1",
"price": 5,
"category": {
"Id": 1,
"name": "A",
"desc": "ADesc"
}
},
{
"name": "A2",
"price": 10,
"category": {
"Id": 1,
"name": "A",
"desc": "ADesc"
}
},
{
"name": "B1",
"price": 11,
"category": {
"Id": 2,
"name": "B",
"desc": "BDesc"
}
}
]
}
Upvotes: 0
Views: 57
Reputation: 42342
Join the two dataframes and use collect_list
to collect the orders for each user. Write json files as output and partition it using userId. There will be two folders created for each userId, and each folder will contain one json file. Spark can't rename the files or move them, so you'll probably need some os
operations to rename/move them as you wish.
import pyspark.sql.functions as F
orderdf2 = orderdf.select('userId',
F.struct(
F.col('ProductName').alias('name'),
F.col('Price').alias('price'),
F.struct(
F.col('CategoryId').alias('Id'),
F.col('CategoryName').alias('name'),
F.col('CategoryDesc').alias('desc')
).alias('category')
).alias('order')
).groupBy('userId').agg(
F.collect_list('order').alias('order')
)
userdf.join(
orderdf2, 'userId'
).groupBy(
'userId','name','address'
).agg(
F.collect_list('order').alias('order')
).write.partitionBy('userId').json('result')
==> userId=1/part-00144-845806db-0700-4585-bb45-01648432abc1.c000.json <==
{"name":"Sufi","address":"Reons","order":[{"name":"A1","price":5,"category":{"Id":"1","name":"A","desc":"Adec"}},{"name":"A2","price":10,"category":{"Id":"1","name":"A","desc":"Adec"}},{"name":"B1","price":11,"category":{"Id":"2","name":"B","desc":"Bdec"}}]}
==> userId=2/part-00189-845806db-0700-4585-bb45-01648432abc1.c000.json <==
{"name":"Ragu","address":"Random","order":[{"name":"B4","price":15,"category":{"Id":"2","name":"B","desc":"Bdec"}}]}
Upvotes: 1
Reputation: 8711
Spark-sql solution:
val df = spark.sql(""" with t1 (
select 1 c1, 'Sufi' c2, 'Reons' c3 union all
select 2 c1, 'Ragu' c2, 'Random' c3
) select c1 userId, c2 name, c3 Addreshh from t1
""")
val order_df = spark.sql(""" with t1 (
select 1 c1, 'A1' c2, 'A1Dec' c3, 'A' c4, 1 c5, 'Adec' c6, 5 c7 union all
select 1 c1, 'A2' c2, 'A2Dec' c3, 'A' c4, 1 c5, 'Adec' c6, 10 c7 union all
select 1 c1, 'B1' c2, 'A1Dec' c3, 'B' c4, 2 c5, 'Bdec' c6, 11 c7 union all
select 2 c1, 'B4' c2, 'A4Dec' c3, 'B' c4, 2 c5, 'Bdec' c6, 15 c7
) select c1 userId, c2 ProductName, c3 ProductDesc, c4 CategoryName, c5 CategoryId, c6 CategoryDesc, c7 Price from t1
""")
df.createOrReplaceTempView("cust")
order_df.createOrReplaceTempView("order")
val dj_src1 = spark.sql(""" select userId, collect_list(named_struct('name',ProductName,'price',Price,'category',category )) order from
( select userId, ProductName, Price, named_struct('id', CategoryId,'name',CategoryName,'desc', CategoryDesc ) category from order ) temp
group by 1
""")
dj_src1.createOrReplaceTempView("src1")
val dj2 = spark.sql(""" select a.userId, a.name, a.Addreshh, b.order
from cust a join
src1 b on
a.userId=b.userid
""")
dj2.toJSON.show(false)
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"userId":1,"name":"Sufi","Addreshh":"Reons","order":[{"name":"A1","price":5,"category":{"id":1,"name":"A","desc":"Adec"}},{"name":"A2","price":10,"category":{"id":1,"name":"A","desc":"Adec"}},{"name":"B1","price":11,"category":{"id":2,"name":"B","desc":"Bdec"}}]}|
|{"userId":2,"name":"Ragu","Addreshh":"Random","order":[{"name":"B4","price":15,"category":{"id":2,"name":"B","desc":"Bdec"}}]} |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Upvotes: 0