Ravi Patel
Ravi Patel

Reputation: 121

Dynamic Target Delta Table As Target For Spark Streaming

We are processing a stream of web logs. Basically activities that users perform on website. For each activity they perform, we a separate activity delta table.

We are exploring what is the best way to do streaming ingest. We have a kafka stream setup where all the activities are ingested in following format. But depending on the activity, we need to decide the different target table for the event to store.

{
   activity_name: "Purchased"
   data: {
      product: "Soap",
      amount: 1200
   }
}

Can you help with what is the best way to handle this scenario?

Upvotes: 1

Views: 622

Answers (1)

Alex Ott
Alex Ott

Reputation: 87069

This is call multiplexing. Usually the solution is to use structured streaming with .foreachBatch function as a sink, and then inside that function write data for each of the possible values of activity_name.

Something like this (for example, as it's shown in this blog post):

activity_names = ["Purchased", ...]
app_name = "my_app_name"

def write_tables(df, epoch):
  df.cache()
  for n in activity_names:
    df.filter(f"activity_name = '{n}'") \
      .write.mode("append") \
      .option("txnVersion", epoch) \
      .option("txnAppId", app_name) \
      .save(...)
    
  df.unpersist()


stread_df.writeStream \
  .foreachBatch(write_tables) \
  .option("checkpointLocation", "some_path") \
  .start()

Please note that we're using idempotent writes for Delta tables to avoid duplicates if microbatch is restarted in the middle of execution.

Upvotes: 1

Related Questions