Reputation: 1029
I want to know how I can create the following “complex” json structure in (py)Spark (2.3.2):
Test data set:
df = sc.parallelize([
[1, 'a', 'x1'],
[1, 'a', 'x2'],
[2, 'b', 'y1'],
[2, 'b', 'y2'],
[3, 'c', 'z'],
]).toDF('id: integer, field1: string, field2: string').cache()
Code:
import pyspark.sql.functions as F
out = df.groupBy('id').agg(F.to_json(F.create_map(
F.lit('key1'),
F.col('field1'),
F.lit('info'),
F.collect_list(F.create_map(
F.lit('number'),
F.col('id'),
F.lit('key2'),
F.col('field2')
))
)).alias('info'))
What my target json structure is, is that I have a data set like this:
[
(id) 1, (info) {“key1”: ‘a’, “info”: [{“number”: 1, “key2”: “x1”}, {“number”: 1, “key2”: “x1”}],
(id) 2, (info) {“key1”: ‘b’, “info”: [{“number”: 2, “key2”: “y1”}, {“number”: 1, “key2”: “x2”}],
(id) 3, (info) {“key1”: ‘c’, “info”: [{“number”: 3, “key2”: “z”}]
]
How could I achieve this? (Can I achieve this?) As I'm always getting the following error:
org.apache.spark.sql.AnalysisException:
cannot resolve 'map('key1', `field1`, 'info', collect_list(map('number',
CAST(`id` AS STRING), 'key2', CAST(`field2` AS STRING))))'
due to data type mismatch: The given values of function map should all be the same type,
but they are [string, array<map<string,string>>]
What I understand from this error is that field1 is a string, and the value of 'info' is not. But that's the way I want it to be... So, could I achieve this another way?
Thanks!
Upvotes: 0
Views: 197
Reputation: 1029
I found one (hackish) way to do things... I don't like it very much but seeing that no-one in this community posted an answer, I am starting to think it's not that easy.
So first of all, I split the "big" aggregation in 2:
out = df.groupBy('id', 'field1').agg(F.to_json(F.create_map(
F.lit('key1'),
F.col('field1'),
F.lit('info'),
F.lit('%%replace%%')
)).alias('first'), F.to_json( F.collect_list(F.create_map(
F.lit('number'),
F.col('id'),
F.lit('key2'),
F.col('field2')
))
).alias('second'))
This will generate the following table:
+---+------+---------------------------------+-------------------------------------------------------+
|id |field1|first |second |
+---+------+---------------------------------+-------------------------------------------------------+
|3 |c |{"key1":"c","info":"%%replace%%"}|[{"number":"3","key2":"z"}] |
|2 |b |{"key1":"b","info":"%%replace%%"}|[{"number":"2","key2":"y1"},{"number":"2","key2":"y2"}]|
|1 |a |{"key1":"a","info":"%%replace%%"}|[{"number":"1","key2":"x1"},{"number":"1","key2":"x2"}]|
+---+------+---------------------------------+-------------------------------------------------------+
And now you combine them together:
df2 = out.withColumn('final', F.expr('REPLACE(first, \'"%%replace%%"\', second)')).drop('first').drop('second')
df2.show(10, False)
+---+------+---------------------------------------------------------------------------+
|id |field1|final |
+---+------+---------------------------------------------------------------------------+
|3 |c |{"key1":"c","info":[{"number":"3","key2":"z"}]} |
|2 |b |{"key1":"b","info":[{"number":"2","key2":"y1"},{"number":"2","key2":"y2"}]}|
|1 |a |{"key1":"a","info":[{"number":"1","key2":"x1"},{"number":"1","key2":"x2"}]}|
+---+------+---------------------------------------------------------------------------+
A bit unorthodox, but no complaints from Spark :)
Upvotes: 1