Reputation: 73
I have the source data as shown below:
{
id: 1,
y: [
{"firstName": "Will", "lastName": "Jackson"},
{"firstName": "Chris", "lastName": "Johnson"}
],
z: [
{"profession": "Actor"},
{"profession": "Comedian", "flag": "True"},
{"profession": "Fighter", "flag": "False"}
]
},
{
...
}
Is there a way I can explode the y
& z
columns in the JSON that contains JSON objects within the array. Also, I want to conditionally read the profession
column based on flag
. The flag
could be a key that's present or absent. If absent or False
, read the profession
as NULL. If present read the value. Ultimately, the above JSON should be as shown below:
id | firstName | lastName | profession | flag |
---|---|---|---|---|
1 | Will | Jackson | NULL | NULL |
1 | Chris | Johnson | NULL | NULL |
1 | Will | Jackson | Comedian | True |
1 | Chris | Johnson | Comedian | True |
1 | Will | Jackson | NULL | False |
1 | Chris | Johnson | NULL | False |
Upvotes: 0
Views: 79
Reputation: 6082
Just following your logic: explode then transform flag
and profession
as you want
(df
.withColumn('y', F.explode('y'))
.withColumn('z', F.explode('z'))
.select('id', 'y.*', 'z.*')
.withColumn('profession', F
.when((F.col('flag').isNull()) | (F.col('flag') == False), None)
.otherwise(F.col('profession'))
)
.show(10, False)
)
+---+---------+--------+-----+----------+
|id |firstName|lastName|flag |profession|
+---+---------+--------+-----+----------+
|1 |Will |Jackson |null |null |
|1 |Will |Jackson |True |Comedian |
|1 |Will |Jackson |False|null |
|1 |Chris |Johnson |null |null |
|1 |Chris |Johnson |True |Comedian |
|1 |Chris |Johnson |False|null |
+---+---------+--------+-----+----------+
Upvotes: -1
Reputation: 336
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,StringType,ArrayType
from pyspark.sql.functions import explode,arrays_zip,col,when
spark = SparkSession.builder.appName('pyspark-by-examples').getOrCreate()
simple_json = """{"id": 1, "y": [ {"firstName": "Will", "lastName": "Jackson"}, {"firstName": "Chris", "lastName": "Johnson"} ],"z": [{"profession": "Actor"},{"profession": "Comedian", "flag": "True"},{"profession": "Fighter", "flag": "False"}]}"""
json_schema = StructType(fields=[
StructField('id', StringType()),
StructField('y', ArrayType(StructType([StructField('firstName', StringType()),StructField('lastName', StringType())]))),
StructField('z', ArrayType(StructType([StructField('flag', StringType()),StructField('profession', StringType())])))])
rddjson = sc.parallelize([simple_json])
df = sqlContext.read.json(rddjson)
newDF = spark.createDataFrame(df.rdd, schema=json_schema)
df3 = newDF.select(newDF.id,newDF.y,newDF.z).withColumn("name",explode(arrays_zip(newDF.y))).withColumn("profession_orig",explode(arrays_zip(newDF.z))).withColumn("profession", when(col("profession_orig.z.flag") == "True",col("profession_orig.z.profession")).otherwise("NULL")).select("id","name.y.firstName","name.y.lastName","profession","profession_orig.z.flag")
df3.show()
output -
+---+---------+--------+----------+-----+
| id|firstName|lastName|profession| flag|
+---+---------+--------+----------+-----+
| 1| Will| Jackson| NULL| null|
| 1| Will| Jackson| Comedian| True|
| 1| Will| Jackson| NULL|False|
| 1| Chris| Johnson| NULL| null|
| 1| Chris| Johnson| Comedian| True|
| 1| Chris| Johnson| NULL|False|
+---+---------+--------+----------+-----+
Upvotes: 0