Reputation: 11
I am setting up an Azure Databricks delta-lake and I am struggling to load my json data into delta-lake. We have 100's of different file formats in json. All stored in a data lake.
Now I am trying to avoid writing 100 different python notebooks and instead building one metadata driven notebook that should be able to handle all the different json-formats.
I am able to get the first batch of data into the delta-lake, so far so good. The problem is when the first json file I load into a specific delta-lake tables holds NULL in a column. Then the write to delta-lake automatically creates this column as string. The next file holds a nested json array in that same column and the result is I get this error message:
AnalysisException: Failed to merge fields 'payment_info' and 'payment_info'. Failed to merge incompatible data types StringType and StructType(StructField(@type,StringType,true),StructField(bank_name,StringType,true),StructField(bic,StringType,true),StructField(iban,StringType,true),StructField(owner,StringType,true))
This is my data. First the json would look like:
{
"payment_info": null,
"shop_id": 1,
"shop_name": "Testshop",
"shop_state": "OPEN"
}
Then the second json file holds this info:
{
"payment_info": {
"@type": "IBAN",
"bank_name": "bankName",
"bic": "ABCD12345",
"owner": "baName"
},
"shop_id": 2,
"shop_name": "Another TestShop",
"shop_state": "OPEN"
}
This is the relevant part of the code I guess:
jsondf = spark.read.option("multiline","true") \
.format("json") \
.option("mergeSchema", "true") \
.load(load_path)
jsondf.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.saveAsTable(table_name)
I would be perfectly happy if I could just create the initial delta-table without the columns that come as NULL for all records. This way the schema evolution would work when they later come populated. I don't have any situations where the column first comes as int and then as string or first as string and then as complex struct.
I have also tried putting it in a temp-table in between:
jsondf.createOrReplaceTempView("tempview")
sourcedf = spark.sql("select * from tempview")
and using a merge statement instead:
spark.sql("set spark.databricks.delta.schema.autoMerge.enabled=true")
spark.sql("MERGE INTO " + table_name + " as t " + \
"USING " + batch_table_name + " as s " + \
"ON " + joinsql + " " + \
"WHEN MATCHED THEN " + \
"UPDATE SET * " + \
"WHEN NOT MATCHED THEN " + \
"INSERT *")
Unfortunately it all ends up with the same result.
So, is there any easy way to drop all columns that are null? Or is there another way?
Perhaps I could use a dummy json-file with all the fields populated to get the schema-definition for the table? That would be doable. While manually defining the schema would be very time consuming.
Cheers!
Upvotes: 1
Views: 4115
Reputation: 245
This is two months old but here is how I approached this same problem.
Read all json files in the directory. Save the schema to a file to reference in a separate job that reads the directory and writes the delta.
from pyspark.sql.types import StructType
import json
file_schema = spark.read.format("json").load("s3a://dir/*.json")
file_schema.printSchema()
with open("/dbfs/FileStore/schemas/schema.json", "w") as f:
json.dump(file_schema.schema.jsonValue(), f)
Now in the new job I load the schema file and use it on the read
with open("/dbfs/FileStore/schemas/schema.json") as f:
the_schema = StructType.fromJson(json.load(f))
You can then reference it in the schema option
file_reader = spark.readStream.format('json') \
.schema(gds_schema) \
.load(your_path_to_files)
This is a scrubbed down version but puts you in the right direction and will have a managed schema that you can reference. If you have files with the same field names but different value types you will want it to be saved as a string.
However, I would suggest just using auto loader for your use case let auto loader store the fields as strings and downstream apply your transformations. I use it for 250k new json files a day with a schema of over 350 fields.
Upvotes: 0