Reputation: 31
def tranform_doc(docs):
json_list = []
print(docs)
for doc in docs:
json_doc = {}
json_doc["customKey"] = doc
json_list.append(json_doc)
return json_list
df.groupBy("colA") \
.agg(custom_udf(collect_list(col("colB"))).alias("customCol"))
First Hurdle:
Input: ["str1","str2","str3"]
Output: [{"customKey":"str1"},{"customKey":"str2"},{"customKey":"str3"}]
Second Hurdle:
columns in agg collect_list are changing dynamically. So, how to adjust schema dynamically.
when elements in list changes, receiving an error Input row doesn't have expected number of values required by the schema. 1 fields are required while 3 values are provided
What I did:
def tranform_doc(agg_docs):
return json_list
## When I failed to get a list of JSON I tried just return the original list of strings to the list of json
schema = StructType([{StructField("col1",StringType()),StructField("col2",StringType()),StructField("col3",StringType())}])
custom_udf = udf(tranform_doc,schema)
df.groupBy("colA") \
.agg(custom_udf(collect_list(col("colB"))).alias("customCol"))
Output I got: {"col2":"str1","col1":"str2","col3":"str3"}
Struggling to get the required list of JSON strings and to make it dynamical to number of elements in the list
Upvotes: 1
Views: 1694
Reputation: 42342
No UDF needed. You can convert colB
to a struct
before collect_list
.
import pyspark.sql.functions as F
df2 = df.groupBy('colA').agg(
F.to_json(
F.collect_list(
F.struct(F.col('colB').alias('customKey'))
)
).alias('output')
)
Upvotes: 2