Reputation: 495
I have created dataframe as follows :
+----+-------+-------+
| age| number|name |
+----+-------+-------+
| 16| 12|A |
| 16| 13|B |
| 17| 16|E |
| 17| 17|F |
+----+-------+-------+
How to convert it into following json:
{
'age' : 16,
'values' : [{‘number’: ‘12’ , ‘name’ : 'A'},{‘number’: ‘12’ , ‘name’ : 'A'} ]
},{
'age' : 17,
'values' : [{‘number’: ‘16’ , ‘name’ : 'E'},{‘number’: ‘17’ , ‘name’ : 'F'} ]
}
Upvotes: 0
Views: 4660
Reputation: 3427
You can convert the DF to RDD and apply your transformations:
NewSchema = StructType([StructField("age", IntegerType())
, StructField("values", StringType())
])
res_df = df.rdd.map(lambda row: (row[0], ([{'number':row[1], 'name':row[2]}])))\
.reduceByKey(lambda x, y: x + y)\
.map(lambda row: (row[0], json.dumps(row[1])))\
.toDF(NewSchema)
res_df.show(20, False)
Show res_df:
+---+------------------------------------------------------------+
|age|values |
+---+------------------------------------------------------------+
|16 |[{"number": 12, "name": "A"}, [{"number": 13, "name": "B"}] |
|17 |[{"number": 17, "name": "F"}, [{"number": 16, "name": "E"}] |
+---+------------------------------------------------------------+
Saving the DF as JSON File:
res_df.coalesce(1).write.format('json').save('output.json')
Upvotes: 2
Reputation: 15318
assuming df
is your dataframe,
from pyspark.sql import functions as F
new_df = df.select(
"age",
F.struct(
F.col("number"),
F.col("name"),
).alias("values")
).groupBy(
"age"
).agg(
F.collect_list("values").alias("values")
)
new_df.toJSON()
# or
new_df.write.json(...)
Upvotes: 8