Reputation: 1092
I have a delta table which has a column with JSON data. I do not have a schema for it and need a way to convert the JSON data into columns
|id | json_data
| 1 | {"name":"abc", "depts":["dep01", "dep02"]}
| 2 | {"name":"xyz", "depts":["dep03"],"sal":100}
| 3 | {"name":"pqr", "depts":["dep02"], "address":{"city":"SF"}}
Expected output
|id | name | depts | sal | address_city
| 1 | "abc" | ["dep01", "dep02"] | null| null
| 2 | "xyz" | ["dep03"] | 100 | null
| 3 | "pqr" | ["dep02"] | null| "SF"
Upvotes: 2
Views: 7801
Reputation: 248
To parse and promote the properties from a JSON string column without a known schema dynamically, I am afraid you cannot use pyspark, it can be done by using Scala.
For example when you have some avro files produced by Kafka, and you want to be able to parse the Value
which is a serialized JSON string dynamically
var df = spark.read.format("avro").load("abfss://[email protected]/xyz.avro").select("Value")
var df_parsed = spark.read.json(df.as[String])
display(df_parsed)
The key is spark.read.json(df.as[String])
in Scala, it basically
String
.So far there is no equivalent methods exposed to pyspark as far as I know.
Upvotes: 0
Reputation: 1739
Input Dataframe -
df = spark.createDataFrame(data = [(1 , """{"name":"abc", "depts":["dep01", "dep02"]}"""), (2 , """{"name":"xyz", "depts":["dep03"],"sal":100}"""), (3 , """{"name":"pqr", "depts":["dep02"], "address":{"city":"SF"}}""")], schema = ["id", "json_data"])
df.show(truncate=False)
+---+----------------------------------------------------------+
|id |json_data |
+---+----------------------------------------------------------+
|1 |{"name":"abc", "depts":["dep01", "dep02"]} |
|2 |{"name":"xyz", "depts":["dep03"],"sal":100} |
|3 |{"name":"pqr", "depts":["dep02"], "address":{"city":"SF"}}|
+---+----------------------------------------------------------+
Convert json_data
column to MapType
as below -
from pyspark.sql.functions import *
from pyspark.sql.types import *
df1 = df.withColumn("cols", from_json("json_data", MapType(StringType(), StringType()))).drop("json_data")
df1.show(truncate=False)
+---+-----------------------------------------------------------+
|id |cols |
+---+-----------------------------------------------------------+
|1 |{name -> abc, depts -> ["dep01","dep02"]} |
|2 |{name -> xyz, depts -> ["dep03"], sal -> 100} |
|3 |{name -> pqr, depts -> ["dep02"], address -> {"city":"SF"}}|
+---+-----------------------------------------------------------+
Now, column cols
needs to be exploded as below -
df2 = df1.select("id",explode("cols").alias("col_columns", "col_rows"))
df2.show(truncate=False)
+---+-----------+-----------------+
|id |col_columns|col_rows |
+---+-----------+-----------------+
|1 |name |abc |
|1 |depts |["dep01","dep02"]|
|2 |name |xyz |
|2 |depts |["dep03"] |
|2 |sal |100 |
|3 |name |pqr |
|3 |depts |["dep02"] |
|3 |address |{"city":"SF"} |
+---+-----------+-----------------+
Once, you have col_columns
and col_rows
as individual columns, all that is needed to do is pivot
col_columns
and aggregate it using its corresponding first
col_rows
as below -
df3 = df2.groupBy("id").pivot("col_columns").agg(first("col_rows"))
df3.show(truncate=False)
+---+-------------+-----------------+----+----+
|id |address |depts |name|sal |
+---+-------------+-----------------+----+----+
|1 |null |["dep01","dep02"]|abc |null|
|2 |null |["dep03"] |xyz |100 |
|3 |{"city":"SF"}|["dep02"] |pqr |null|
+---+-------------+-----------------+----+----+
Finally, you again need to repeat the above steps to bring address
in structured format as below -
df4 = df3.withColumn("address", from_json("address", MapType(StringType(), StringType())))
df4.select("id", "depts", "name", "sal",explode_outer("address").alias("key", "address_city")).drop("key").show(truncate=False)
+---+-----------------+----+----+------------+
|id |depts |name|sal |address_city|
+---+-----------------+----+----+------------+
|1 |["dep01","dep02"]|abc |null|null |
|2 |["dep03"] |xyz |100 |null |
|3 |["dep02"] |pqr |null|SF |
+---+-----------------+----+----+------------+
Upvotes: 5
Reputation: 350
In order to solve it you can use split function as code below.
The function takes 2 parameters, the first one is the column itself and the second is the pattern to split the elements from column array.
More information and examples cand be found here:
from pyspark.sql import functions as F
df.select(F.split(F.col('depts'), ','))
Upvotes: 0