John Constantine
John Constantine

Reputation: 1092

Split JSON string column to multiple columns without schema - PySpark

I have a delta table which has a column with JSON data. I do not have a schema for it and need a way to convert the JSON data into columns

|id | json_data
| 1 | {"name":"abc", "depts":["dep01", "dep02"]}
| 2 | {"name":"xyz", "depts":["dep03"],"sal":100}
| 3 | {"name":"pqr", "depts":["dep02"], "address":{"city":"SF"}}

Expected output

|id | name    | depts              | sal | address_city 
| 1 | "abc"   | ["dep01", "dep02"] | null| null         
| 2 | "xyz"   | ["dep03"]          | 100 | null         
| 3 | "pqr"   | ["dep02"]          | null| "SF"        

Upvotes: 2

Views: 7801

Answers (3)

jeffrey
jeffrey

Reputation: 248

To parse and promote the properties from a JSON string column without a known schema dynamically, I am afraid you cannot use pyspark, it can be done by using Scala.

For example when you have some avro files produced by Kafka, and you want to be able to parse the Value which is a serialized JSON string dynamically

var df = spark.read.format("avro").load("abfss://[email protected]/xyz.avro").select("Value")
var df_parsed = spark.read.json(df.as[String])
display(df_parsed)

The key is spark.read.json(df.as[String]) in Scala, it basically

  1. Convert that DF ( it has only one column that we are interested in in this case, you can of course deal with multiple interested columns similarily and union whatever you want ) to String.
  2. Parse the JSON string using standard spark read option, this does not require a schema.

So far there is no equivalent methods exposed to pyspark as far as I know.

Upvotes: 0

Dipanjan Mallick
Dipanjan Mallick

Reputation: 1739

Input Dataframe -

df = spark.createDataFrame(data = [(1 , """{"name":"abc", "depts":["dep01", "dep02"]}"""), (2 , """{"name":"xyz", "depts":["dep03"],"sal":100}"""), (3 , """{"name":"pqr", "depts":["dep02"], "address":{"city":"SF"}}""")], schema = ["id", "json_data"])
df.show(truncate=False)

+---+----------------------------------------------------------+
|id |json_data                                                 |
+---+----------------------------------------------------------+
|1  |{"name":"abc", "depts":["dep01", "dep02"]}                |
|2  |{"name":"xyz", "depts":["dep03"],"sal":100}               |
|3  |{"name":"pqr", "depts":["dep02"], "address":{"city":"SF"}}|
+---+----------------------------------------------------------+

Convert json_data column to MapType as below -

from pyspark.sql.functions import *
from pyspark.sql.types import *

df1 = df.withColumn("cols", from_json("json_data", MapType(StringType(), StringType()))).drop("json_data")
df1.show(truncate=False)

+---+-----------------------------------------------------------+
|id |cols                                                       |
+---+-----------------------------------------------------------+
|1  |{name -> abc, depts -> ["dep01","dep02"]}                  |
|2  |{name -> xyz, depts -> ["dep03"], sal -> 100}              |
|3  |{name -> pqr, depts -> ["dep02"], address -> {"city":"SF"}}|
+---+-----------------------------------------------------------+

Now, column cols needs to be exploded as below -

df2 = df1.select("id",explode("cols").alias("col_columns", "col_rows"))
df2.show(truncate=False)

+---+-----------+-----------------+
|id |col_columns|col_rows         |
+---+-----------+-----------------+
|1  |name       |abc              |
|1  |depts      |["dep01","dep02"]|
|2  |name       |xyz              |
|2  |depts      |["dep03"]        |
|2  |sal        |100              |
|3  |name       |pqr              |
|3  |depts      |["dep02"]        |
|3  |address    |{"city":"SF"}    |
+---+-----------+-----------------+

Once, you have col_columns and col_rows as individual columns, all that is needed to do is pivot col_columns and aggregate it using its corresponding first col_rows as below -

df3 = df2.groupBy("id").pivot("col_columns").agg(first("col_rows"))
df3.show(truncate=False)

+---+-------------+-----------------+----+----+
|id |address      |depts            |name|sal |
+---+-------------+-----------------+----+----+
|1  |null         |["dep01","dep02"]|abc |null|
|2  |null         |["dep03"]        |xyz |100 |
|3  |{"city":"SF"}|["dep02"]        |pqr |null|
+---+-------------+-----------------+----+----+

Finally, you again need to repeat the above steps to bring address in structured format as below -

df4 = df3.withColumn("address", from_json("address", MapType(StringType(), StringType())))
df4.select("id", "depts", "name", "sal",explode_outer("address").alias("key", "address_city")).drop("key").show(truncate=False)

+---+-----------------+----+----+------------+
|id |depts            |name|sal |address_city|
+---+-----------------+----+----+------------+
|1  |["dep01","dep02"]|abc |null|null        |
|2  |["dep03"]        |xyz |100 |null        |
|3  |["dep02"]        |pqr |null|SF          |
+---+-----------------+----+----+------------+

Upvotes: 5

danimille
danimille

Reputation: 350

In order to solve it you can use split function as code below.

The function takes 2 parameters, the first one is the column itself and the second is the pattern to split the elements from column array.

More information and examples cand be found here:

https://sparkbyexamples.com/pyspark/pyspark-convert-string-to-array-column/#:~:text=PySpark%20SQL%20provides%20split(),and%20converting%20it%20into%20ArrayType.

from pyspark.sql import functions as F

df.select(F.split(F.col('depts'), ','))

Upvotes: 0

Related Questions