Reputation: 33
I use Databricks Auto Loader to ingest files that contain data with different schemas and want to write them in corresponding delta tables using update mode.
There may be many (>15) different message types in a stream, so that I'd have to write an output stream for very one of them. There is an "upsert" function for every table.
Can this be condensed using a function (example given below) that will save a few keystrokes?
upload_path = '/example'
# Set up the stream to begin reading incoming files from the
# upload_path location.
df = spark.readStream.format('cloudFiles') \
.option('cloudFiles.format', 'avro') \
.load(upload_path)
# filter messages and apply JSON schema
table1_df = filter_and_transform(df, json_schema1)
table2_df = filter_and_transform(df, json_schema2)
table3_df = filter_and_transform(df, json_schema3)
# each table has it's own upsert function
def create_output_stream(df, table_name, upsert_function):
# Create stream and return it.
return df.writeStream.format('delta') \
.writeStream \
.trigger(once=True) \
.format("delta") \
.foreachBatch(upsert_function) \
.queryName(f"autoLoader_query_{table_name}") \
.option("checkpointLocation", f"dbfs:/delta/somepath/{table_name}") \
.outputMode("update")
output_stream1 = create_output_stream(table1_df, "table_name1", upsert_function1).start() # start stream in outer environment
output_stream2 = create_output_stream(table2_df, "table_name2", upsert_function2).start()
output_stream3 = create_output_stream(table3_df, "table_name3", upsert_function3).start()
Upvotes: 2
Views: 579
Reputation: 87254
Yes, of course it's possible to do it this way - it's quite a standard pattern.
But you need to take one thing into a consideration - if your input data isn't partitioned by the message type, then you will scan same files multiple times (for each message type). Alternative to it could be following - you perform filtering & upsert of all message types using the single foreachBatch, like this:
df = spark.readStream.format('cloudFiles') \
.option('cloudFiles.format', 'avro') \
.load(upload_path)
def do_all_upserts(df, epoch):
df.cache()
table1_df = filter_and_transform(df, json_schema1)
table2_df = filter_and_transform(df, json_schema2)
table3_df = filter_and_transform(df, json_schema3)
# really you can run multiple writes using multithreading, or something like it
do_upsert(table1_df)
do_upsert(table2_df)
...
# free resources
df.unpersist()
df.writeStream.format('delta') \
.writeStream \
.trigger(once=True) \
.format("delta") \
.foreachBatch(do_all_upserts) \
.option("checkpointLocation", f"dbfs:/delta/somepath/{table_name}") \
.start()
Upvotes: 1