Steven Van Ingelgem
Steven Van Ingelgem

Reputation: 1029

Create a complex dataframe on (py)spark

I want to know how I can create the following “complex” json structure in (py)Spark (2.3.2):

Test data set:

df = sc.parallelize([
    [1, 'a', 'x1'],
    [1, 'a', 'x2'],
    [2, 'b', 'y1'],
    [2, 'b', 'y2'],
    [3, 'c', 'z'],
]).toDF('id: integer, field1: string, field2: string').cache()

Code:

import pyspark.sql.functions as F

out = df.groupBy('id').agg(F.to_json(F.create_map(
    F.lit('key1'),
    F.col('field1'),
    F.lit('info'),
    F.collect_list(F.create_map(
        F.lit('number'),
        F.col('id'),
        F.lit('key2'),
        F.col('field2')
    ))
)).alias('info'))

What my target json structure is, is that I have a data set like this:

[
(id) 1, (info) {“key1”: ‘a’, “info”: [{“number”: 1, “key2”: “x1”}, {“number”: 1, “key2”: “x1”}],
(id) 2, (info) {“key1”: ‘b’, “info”: [{“number”: 2, “key2”: “y1”}, {“number”: 1, “key2”: “x2”}],
(id) 3, (info) {“key1”: ‘c’, “info”: [{“number”: 3, “key2”: “z”}]
]

How could I achieve this? (Can I achieve this?) As I'm always getting the following error:

org.apache.spark.sql.AnalysisException:
cannot resolve 'map('key1', `field1`, 'info', collect_list(map('number',
  CAST(`id` AS STRING), 'key2', CAST(`field2` AS STRING))))'
due to data type mismatch: The given values of function map should all be the same type,
  but they are [string, array<map<string,string>>]

What I understand from this error is that field1 is a string, and the value of 'info' is not. But that's the way I want it to be... So, could I achieve this another way?

Thanks!

Upvotes: 0

Views: 197

Answers (1)

Steven Van Ingelgem
Steven Van Ingelgem

Reputation: 1029

I found one (hackish) way to do things... I don't like it very much but seeing that no-one in this community posted an answer, I am starting to think it's not that easy.

So first of all, I split the "big" aggregation in 2:

out = df.groupBy('id', 'field1').agg(F.to_json(F.create_map(
    F.lit('key1'),
    F.col('field1'),
    F.lit('info'),
    F.lit('%%replace%%')
)).alias('first'), F.to_json(    F.collect_list(F.create_map(
        F.lit('number'),
        F.col('id'),
        F.lit('key2'),
        F.col('field2')
    ))
).alias('second'))

This will generate the following table:

+---+------+---------------------------------+-------------------------------------------------------+
|id |field1|first                            |second                                                 |
+---+------+---------------------------------+-------------------------------------------------------+
|3  |c     |{"key1":"c","info":"%%replace%%"}|[{"number":"3","key2":"z"}]                            |
|2  |b     |{"key1":"b","info":"%%replace%%"}|[{"number":"2","key2":"y1"},{"number":"2","key2":"y2"}]|
|1  |a     |{"key1":"a","info":"%%replace%%"}|[{"number":"1","key2":"x1"},{"number":"1","key2":"x2"}]|
+---+------+---------------------------------+-------------------------------------------------------+

And now you combine them together:

df2 = out.withColumn('final', F.expr('REPLACE(first, \'"%%replace%%"\', second)')).drop('first').drop('second')
df2.show(10, False)

+---+------+---------------------------------------------------------------------------+
|id |field1|final                                                                      |
+---+------+---------------------------------------------------------------------------+
|3  |c     |{"key1":"c","info":[{"number":"3","key2":"z"}]}                            |
|2  |b     |{"key1":"b","info":[{"number":"2","key2":"y1"},{"number":"2","key2":"y2"}]}|
|1  |a     |{"key1":"a","info":[{"number":"1","key2":"x1"},{"number":"1","key2":"x2"}]}|
+---+------+---------------------------------------------------------------------------+

A bit unorthodox, but no complaints from Spark :)

Upvotes: 1

Related Questions