Reputation: 195
I've Databricks notebook that reads the delta data in JSON format every hour. So lets says at 11AM the schema of the file is as follows,
root
|-- number: string (nullable = true)
|-- company: string (nullable = true)
|-- assignment: struct (nullable = true)
| |-- link: string (nullable = true)
| |-- value: string (nullable = true)
The next hour at 12PM the schema changes to,
root
|-- number: string (nullable = true)
|-- company: struct (nullable = true)
| |-- link: string (nullable = true)
| |-- value: string (nullable = true)
|-- assignment: struct (nullable = true)
| |-- link: string (nullable = true)
| |-- value: string (nullable = true)
Some of the columns change from string to struct and vice-versa. So if I select the col(company.link) and the incoming schema is of type string the code fails. How do I handle schema changes in PySpark when reading the file as my end goal is to flatten the JSON to a CSV format.
Upvotes: 1
Views: 805
Reputation: 195
def get_dtype(df,colname):
return [dtype for name, dtype in df.dtypes if name == colname][0]
#df has the exploded JSON data
df2 = df.select("result.number",
"result.company",
"result.assignment_group")
df23 = df2
for name, cols in df2.dtypes:
if 'struct' in get_dtype(df2, name):
try:
df23 = df23.withColumn(name+"_link", col(name+".link")).withColumn(name+"_value", col(name+".value")).drop(name)
except:
print("error")
df23.printSchema()
root
|-- number: string (nullable = true)
|-- company: string (nullable = true)
|-- assignment_group_link: string (nullable = true)
|-- assignment_group_value: string (nullable = true)
So this is what I did,
Upvotes: 1