roosm
roosm

Reputation: 11

Dynamic schema evolution of json files into delta-lake

I am setting up an Azure Databricks delta-lake and I am struggling to load my json data into delta-lake. We have 100's of different file formats in json. All stored in a data lake.

Now I am trying to avoid writing 100 different python notebooks and instead building one metadata driven notebook that should be able to handle all the different json-formats.

I am able to get the first batch of data into the delta-lake, so far so good. The problem is when the first json file I load into a specific delta-lake tables holds NULL in a column. Then the write to delta-lake automatically creates this column as string. The next file holds a nested json array in that same column and the result is I get this error message:

AnalysisException: Failed to merge fields 'payment_info' and 'payment_info'. Failed to merge incompatible data types StringType and StructType(StructField(@type,StringType,true),StructField(bank_name,StringType,true),StructField(bic,StringType,true),StructField(iban,StringType,true),StructField(owner,StringType,true))

This is my data. First the json would look like:

{
"payment_info": null,
"shop_id": 1,
"shop_name": "Testshop",
"shop_state": "OPEN"
}

Then the second json file holds this info:

{
"payment_info": {
    "@type": "IBAN",
    "bank_name": "bankName",
    "bic": "ABCD12345",
    "owner": "baName"
},
"shop_id": 2,
"shop_name": "Another TestShop",
"shop_state": "OPEN"

}

This is the relevant part of the code I guess:

jsondf = spark.read.option("multiline","true") \
  .format("json") \
  .option("mergeSchema", "true") \
  .load(load_path)

jsondf.write \
  .format("delta") \
  .mode("append") \
  .option("mergeSchema", "true") \
  .saveAsTable(table_name)

I would be perfectly happy if I could just create the initial delta-table without the columns that come as NULL for all records. This way the schema evolution would work when they later come populated. I don't have any situations where the column first comes as int and then as string or first as string and then as complex struct.

I have also tried putting it in a temp-table in between:

jsondf.createOrReplaceTempView("tempview")
sourcedf = spark.sql("select * from tempview")

and using a merge statement instead:

spark.sql("set spark.databricks.delta.schema.autoMerge.enabled=true")
spark.sql("MERGE INTO " + table_name + " as t " + \
  "USING " + batch_table_name + " as s " + \
  "ON " + joinsql + " " + \
  "WHEN MATCHED THEN " + \
     "UPDATE SET * " + \
  "WHEN NOT MATCHED THEN " + \
     "INSERT *")

Unfortunately it all ends up with the same result.

So, is there any easy way to drop all columns that are null? Or is there another way?

Perhaps I could use a dummy json-file with all the fields populated to get the schema-definition for the table? That would be doable. While manually defining the schema would be very time consuming.

Cheers!

Upvotes: 1

Views: 4115

Answers (1)

dotcomken
dotcomken

Reputation: 245

This is two months old but here is how I approached this same problem.

Read all json files in the directory. Save the schema to a file to reference in a separate job that reads the directory and writes the delta.

from pyspark.sql.types import StructType    
import json    

file_schema = spark.read.format("json").load("s3a://dir/*.json")

file_schema.printSchema()

with open("/dbfs/FileStore/schemas/schema.json", "w") as f:
    json.dump(file_schema.schema.jsonValue(), f)

Now in the new job I load the schema file and use it on the read

with open("/dbfs/FileStore/schemas/schema.json") as f:
    the_schema = StructType.fromJson(json.load(f))

You can then reference it in the schema option

file_reader = spark.readStream.format('json') \
    .schema(gds_schema) \
    .load(your_path_to_files)

This is a scrubbed down version but puts you in the right direction and will have a managed schema that you can reference. If you have files with the same field names but different value types you will want it to be saved as a string.

However, I would suggest just using auto loader for your use case let auto loader store the fields as strings and downstream apply your transformations. I use it for 250k new json files a day with a schema of over 350 fields.

Upvotes: 0

Related Questions