Reputation: 677
I have a dataframe in PySpark with 3 columns - json, date and object_id:
-----------------------------------------------------------------------------------------
|json |date |object_id|
-----------------------------------------------------------------------------------------
|{'a':{'b':0,'c':{'50':0.005,'60':0,'100':0},'d':0.01,'e':0,'f':2}}|2020-08-01|xyz123 |
|{'a':{'m':0,'n':{'50':0.005,'60':0,'100':0},'d':0.01,'e':0,'f':2}}|2020-08-02|xyz123 |
|{'g':{'h':0,'j':{'50':0.005,'80':0,'100':0},'d':0.02}} |2020-08-03|xyz123 |
-----------------------------------------------------------------------------------------
Now I have a list of variables: [a.c.60, a.n.60, a.d, g.h]. I need to extract only these variables from the json column of above mentioned dataframe and to add those variables as columns in the dataframe with their respective values.
So in the end, the dataframe should look like:
-------------------------------------------------------------------------------------------------------
|json |date |object_id|a.c.60|a.n.60|a.d |g.h|
-------------------------------------------------------------------------------------------------------
|{'a':{'b':0,'c':{'50':0.005,'60':0,'100':0},'d':0.01,...|2020-08-01|xyz123 |0 |null |0.01|null|
|{'a':{'m':0,'n':{'50':0.005,'60':0,'100':0},'d':0.01,...|2020-08-02|xyz123 |null |0 |0.01|null|
|{'g':{'h':0,'j':{'k':0.005,'':0,'100':0},'d':0.01}} |2020-08-03|xyz123 |null |null |0.02|0 |
-------------------------------------------------------------------------------------------------------
Please help to get this result dataframe. The main problem I am facing is due to no fixed structure for the incoming json data. The json data can be anything in nested form but I need to extract only the given four variables. I have achieved this in Pandas by flattening out the json string and then to extract the 4 variables but in Spark it is getting difficult.
Upvotes: 8
Views: 10553
Reputation: 87069
There are 2 ways to do it:
get_json_object
function, like this:import pyspark.sql.functions as F
df = spark.createDataFrame(['{"a":{"b":0,"c":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
'{"a":{"m":0,"n":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
'{"g":{"h":0,"j":{"50":0.005,"80":0,"100":0},"d":0.02}}'],
StringType())
df3 = df.select(F.get_json_object(F.col("value"), "$.a.c.60").alias("a_c_60"),
F.get_json_object(F.col("value"), "$.a.n.60").alias("a_n_60"),
F.get_json_object(F.col("value"), "$.a.d").alias("a_d"),
F.get_json_object(F.col("value"), "$.g.h").alias("g_h"))
will give:
>>> df3.show()
+------+------+----+----+
|a_c_60|a_n_60| a_d| g_h|
+------+------+----+----+
| 0| null|0.01|null|
| null| 0|0.01|null|
| null| null|null| 0|
+------+------+----+----+
from_json
function with the schema, and then extract individual values from structures - this could be more performant than JSON Path:from pyspark.sql.types import *
import pyspark.sql.functions as F
aSchema = StructType([
StructField("c", StructType([
StructField("60", DoubleType(), True)
]), True),
StructField("n", StructType([
StructField("60", DoubleType(), True)
]), True),
StructField("d", DoubleType(), True),
])
gSchema = StructType([
StructField("h", DoubleType(), True)
])
schema = StructType([
StructField("a", aSchema, True),
StructField("g", gSchema, True)
])
df = spark.createDataFrame(['{"a":{"b":0,"c":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
'{"a":{"m":0,"n":{"50":0.005,"60":0,"100":0},"d":0.01,"e":0,"f":2}}',
'{"g":{"h":0,"j":{"50":0.005,"80":0,"100":0},"d":0.02}}'],
StringType())
df2 = df.select(F.from_json("value", schema=schema).alias('data')).select('data.*')
df2.select(df2.a.c['60'], df2.a.n['60'], df2.a.d, df2.g.h).show()
will give
+------+------+----+----+
|a.c.60|a.n.60| a.d| g.h|
+------+------+----+----+
| 0.0| null|0.01|null|
| null| 0.0|0.01|null|
| null| null|null| 0.0|
+------+------+----+----+
Upvotes: 15