Aaron.K
Aaron.K

Reputation: 25

Convert spark dataframe to nested JSON using pyspark

I'm trying convert a spark dataframe to JSON. There are about 1 millions rows in this dataframe and the sample code is below, but the performance is really bad. The desired output would be one member_id shows one time in the JSON file, same for the tag_name under one member_id. Please let me know if any possible way to do this faster.

Sample Code:

iresult = sdf.groupBy('member_id','tag_name').agg(ch.collect_list(ch.struct('detail_name','detail_value')).alias('detail')).\

groupBy('member_id').agg(ch.collect_list(ch.struct('tag_name','detail')).alias('tag'))\

.agg(ch.to_json(ch.collect_list(ch.struct('member_id','tag'))).alias('result'))

result.show()

detail.csv:

member_id, tag_name, detail_name, detail_value
-------------------------------------------------------
abc123, m1, Service_A, 20
abc123, m1, Service_B, 20
abc123, m2, Service_C, 10
xyz456, m3, Service A, 5
xyz456, m3, Service A, 10

Desired Output JSON:

{   "member_id": "abc123",
    "tag":[ {"tag_name": "m1",
            "detail":[{ "detail_name": "Service_A",
                        "detail_value": "20"},
                    {   "detail_name": "Service_B",
                        "detail_value": "20"}]},
            {"tag_name": "m2",
            "detail":[{ "detail_name": "Service_C",
                        "detail_value": "10"}]}]},
{   "member_id": "xyz456",
    "tag":[{"tag_name": "m3",
            "detail":[{ "detail_name": "Service_A",
                        "detail_value": "5"},
                      { "detail_name": "Service_A",
                        "detail_value": "10"}]}]}

duplicate.csv:

member_id, tag_name, detail_name, detail_value
-------------------------------------------------------
abc123, m1, problem_no, 'abc123xyz'
abc123, m1, problem_no, 'abc456zzz'
xyz456, m1, problem_no, 'abc123xyz'
xyz456, m1, problem_no, 'abc456zzz'

Duplicate Output JSON:

{   "member_id": "abc123",
    "tag":[ {"tag_name": "m1",
            "detail":[{ "detail_name": "problem_no",
                        "detail_value": "abc123xyz"},
                    {   "detail_name": "problem_no",
                        "detail_value": "abc456zzz"},
                      { "detail_name": "problem_no",
                        "detail_value": "abc123xyz"},
                    {   "detail_name": "problem_no",
                        "detail_value": "abc456zzz"}]}]},
{   "member_id": "xyz456",
    "tag":[ {"tag_name": "m1",
            "detail":[{ "detail_name": "problem_no",
                        "detail_value": "abc123xyz"},
                    {   "detail_name": "problem_no",
                        "detail_value": "abc456zzz"},
                      { "detail_name": "problem_no",
                        "detail_value": "abc123xyz"},
                    {   "detail_name": "problem_no",
                        "detail_value": "abc456zzz"}]}]}

Upvotes: 0

Views: 2241

Answers (1)

过过招
过过招

Reputation: 4244

Do you mind implementing it through sql statement?

Construct struct layer by layer, and finally use to_json function to generate json string.

df.createOrReplaceTempView('tmp')
sql = """
    select to_json(collect_list(struct(member_id,tag))) as member
    from
        (select member_id,collect_list(struct(tag_name,detail)) as tag
        from
            (select member_id,tag_name,collect_list(struct(detail_name,detail_value)) as detail
            from tmp
            group by member_id,tag_name)
        group by member_id)
"""
df = spark.sql(sql)
df.show(truncate=False)

Upvotes: 2

Related Questions