DisplayName
DisplayName

Reputation: 33

Create Spark output streams with function

I use Databricks Auto Loader to ingest files that contain data with different schemas and want to write them in corresponding delta tables using update mode.

There may be many (>15) different message types in a stream, so that I'd have to write an output stream for very one of them. There is an "upsert" function for every table.

Can this be condensed using a function (example given below) that will save a few keystrokes?

upload_path = '/example'

# Set up the stream to begin reading incoming files from the
# upload_path location.
df = spark.readStream.format('cloudFiles') \
  .option('cloudFiles.format', 'avro') \
  .load(upload_path)

# filter messages and apply JSON schema
table1_df = filter_and_transform(df, json_schema1)
table2_df = filter_and_transform(df, json_schema2)
table3_df = filter_and_transform(df, json_schema3)

# each table has it's own upsert function
def create_output_stream(df, table_name, upsert_function):
    # Create stream and return it.
    return df.writeStream.format('delta') \
         .writeStream \
         .trigger(once=True) \
         .format("delta") \
         .foreachBatch(upsert_function) \
         .queryName(f"autoLoader_query_{table_name}") \
         .option("checkpointLocation", f"dbfs:/delta/somepath/{table_name}") \
         .outputMode("update")

output_stream1 = create_output_stream(table1_df, "table_name1", upsert_function1).start() # start stream in outer environment
output_stream2 = create_output_stream(table2_df, "table_name2", upsert_function2).start()
output_stream3 = create_output_stream(table3_df, "table_name3", upsert_function3).start()


Upvotes: 2

Views: 579

Answers (1)

Alex Ott
Alex Ott

Reputation: 87254

Yes, of course it's possible to do it this way - it's quite a standard pattern.

But you need to take one thing into a consideration - if your input data isn't partitioned by the message type, then you will scan same files multiple times (for each message type). Alternative to it could be following - you perform filtering & upsert of all message types using the single foreachBatch, like this:

df = spark.readStream.format('cloudFiles') \
  .option('cloudFiles.format', 'avro') \
  .load(upload_path)

def do_all_upserts(df, epoch):
  df.cache()
  table1_df = filter_and_transform(df, json_schema1)
  table2_df = filter_and_transform(df, json_schema2)
  table3_df = filter_and_transform(df, json_schema3)
  # really you can run multiple writes using multithreading, or something like it
  do_upsert(table1_df)
  do_upsert(table2_df)
  ...
  # free resources
  df.unpersist()

df.writeStream.format('delta') \
         .writeStream \
         .trigger(once=True) \
         .format("delta") \
         .foreachBatch(do_all_upserts) \
         .option("checkpointLocation", f"dbfs:/delta/somepath/{table_name}") \
         .start()

Upvotes: 1

Related Questions