Reputation: 163
I have a DataFrame with columns col1
and col2
where col2
can contain a JSON string or a plain string. If it contains a parsable JSON string I need to extract the keys and values to separate columns as lists else it should return an empty list as a third and fourth row.
I am using pyspark to achieve this. Any help is appreciated here.
Source DataFrame:
+-----+----------------------------------------------+
| col1| col2 |
+-----+----------------------------------------------+
|a |{"key1":"val1","key2":"val2"} |
|b |{"key5":"val5", "key6":"val6", "key7":"val7"} |
|c |"just a string" |
|d | null |
+----------------------------------------------------+
Desired DataFrame:
+-----+----------------+----------------+
| col1| keys | values |
+-----+----------------+---------------+
|a |[key1,key2] |[val1,val2] |
|b |[key5,key6,key7]|[val5,val6,val7]|
|c |[] |[] |
|d |[] |[] |
+-----+----------------+----------------+
Upvotes: 1
Views: 5327
Reputation: 32700
Old question but I don't really like the other answers which suggest using UDF for this.
For Spark 2.2+, you should use from_json
function to convert json strings into map type then use map_keys
function to gets the keys and map_values
function to get values:
from pyspark.sql.functions import from_json, map_keys, map_values
df1 = df.withColumn('col2', from_json('col2', 'map<string,string>')) \
.withColumn('keys', map_keys('col2')) \
.withColumn('values', map_values('col2')) \
.select('col1', 'keys', 'values')
#+----+------------------+------------------+
#|col1|keys |values |
#+----+------------------+------------------+
#|a |[key1, key2] |[val1, val2] |
#|b |[key5, key6, key7]|[val5, val6, val7]|
#|c |null |null |
#|d |null |null |
#+----+------------------+------------------+
Upvotes: 3
Reputation: 16720
Keys in jsonpath is $[*~]
, values is $[*]
. But that doesn't look like it's supported by get_json_object
.
So we need user defined functions:
def json_keys(s):
import json
try:
data = json.loads(s)
return list(data.keys())
except:
return None
spark.udf.register("json_keys", json_keys)
def json_values(s):
import json
try:
data = json.loads(s)
return list(data.values())
except:
return None
spark.udf.register("json_values", json_values)
df.selectExpr("col1", "json_keys(col2) keys", "json_values(col2) values").collect()
Which yields:
+----+------------+------------+
|col1| keys| values|
+----+------------+------------+
| a|[key1, key2]|[val1, val2]|
| b|[key5, key6]|[val7, val6]|
| c| null| null|
| d| null| null|
+----+------------+------------+
Upvotes: 1
Reputation: 713
Yo can use pyspark function explode from sql module:
From Docs:
pyspark.sql.functions.explode(col)[source] Returns a new row for each element in the given array or map. Uses the default column name col for elements in the array and key and value for elements in the map unless specified otherwise.
from pyspark.sql import Row
eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
eDF.select(explode(eDF.intlist).alias("anInt")).collect()
[Row(anInt=1), Row(anInt=2), Row(anInt=3)]
eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
+---+-----+
|key|value|
+---+-----+
| a| b|
+---+-----+
Upvotes: 0