Reputation: 15490
I need to get all columns of a Spark Datadrame and create other column as a json having keys and values as column names and values. For example, a Dataframe like this:
C1 | C2 | CN |
---|---|---|
10 | 20 | abc |
99 | cde | |
40 | 50 |
Should be transformed to this:
C1 | C2 | CN | JSON |
---|---|---|---|
10 | 20 | abc | { "C1": 10, "C2": 20, "CN": "abc"} |
30 | def | { "C1": 99, "CN": "cde"} | |
40 | 50 | { "C1": 99, C2: 50} |
The columns names and number may vary, so I can't pass it explicitly. The strategy I'm using is:
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StringType
def jsonize_fields(row):
vars = {}
for k, v in row.asDict().items():
if v:
vars[k] = v
return json.dumps(vars)
jsonize_udf = udf(jsonize_fields, StringType())
spark_data_frame = spark_data_frame.withColumn('JSON',
jsonize_udf(struct(*spark_data_frame.columns)))
This works well, but it degraded the performance a lot. So, I would like to convert it to a solution that doesn't use UDF. Is it possible?
Upvotes: 0
Views: 203
Reputation: 15490
Just found it:
from pyspark.sql.functions import to_json
spark_data_frame = spark_data_frame.withColumn('JSON',
to_json(struct(*spark_data_frame.columns)))
By default, to_json
ignore null values (it can be changed by using as second parameter options={"ignoreNullFields": False})
), but not empty ones.
If you want to ignore empty values also, put it before:
from pyspark.sql.functions import col,when
spark_data_frame = spark_data_frame.select(
[when(col(c)=="",None).otherwise(col(c)).alias(c) for c in spark_data_frame.columns])
Upvotes: 0
Reputation: 366
I don't believe there's a straightforward way to achieve this without using UDFs.
However, PySpark actually has a built-in to_json
function for turning a STRUCT
into a JSON string. You don't need to write your own.
Behind the scenes, to_json
will call Spark's internal implementation of the function. In turn, that removes the overhead associated with Python UDFs and should improve performance.
The usage is very similar to your custom UDF:
from pyspark.sql.functions import struct, to_json
spark_data_frame = spark_data_frame.withColumn(
'JSON',
to_json(struct(*spark_data_frame.columns))
)
Upvotes: 1