Reputation: 121
We are processing a stream of web logs. Basically activities that users perform on website. For each activity they perform, we a separate activity delta table.
We are exploring what is the best way to do streaming ingest. We have a kafka stream setup where all the activities are ingested in following format. But depending on the activity, we need to decide the different target table for the event to store.
{
activity_name: "Purchased"
data: {
product: "Soap",
amount: 1200
}
}
Can you help with what is the best way to handle this scenario?
Upvotes: 1
Views: 622
Reputation: 87069
This is call multiplexing. Usually the solution is to use structured streaming with .foreachBatch
function as a sink, and then inside that function write data for each of the possible values of activity_name
.
Something like this (for example, as it's shown in this blog post):
activity_names = ["Purchased", ...]
app_name = "my_app_name"
def write_tables(df, epoch):
df.cache()
for n in activity_names:
df.filter(f"activity_name = '{n}'") \
.write.mode("append") \
.option("txnVersion", epoch) \
.option("txnAppId", app_name) \
.save(...)
df.unpersist()
stread_df.writeStream \
.foreachBatch(write_tables) \
.option("checkpointLocation", "some_path") \
.start()
Please note that we're using idempotent writes for Delta tables to avoid duplicates if microbatch is restarted in the middle of execution.
Upvotes: 1