Reputation: 1471
A pipeline runs every 20 minutes pushing data to ADLS Gen2 storage in ORC format.
event_readstream = (
spark.readStream
.format("orc")
.schema('my-schema.json')
.load('/mnt/path/from/adls/to/orc/files/')
)
...
def upsertEventToDeltaTable(df, batch_id):
input_df = (
df
.drop('address_line3')
.dropDuplicates(['primaryKey'])
.withColumn(partition_column, F.date_format(F.col('updateddate'), 'yyyy').cast('int'))
)
input_df.createOrReplaceTempView("event_df_updates")
input_df._jdf.sparkSession().sql("""
MERGE INTO events dbEvent
USING event_df_updates dfEvent
ON dbEvent.primaryKey = dfEvent.primaryKey
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
event_writestream = (
event_readStream
.drop('adddress_line3') #some column to be dropped
.repartition(1)
.writeStream
.trigger(once=True)
.format("delta")
.option("checkpointLocation", "{}/checkpoints/{}".format(adls_mount_path,mytable))
.foreachBatch(upsertEventToDeltaTable)
.start()
)
forEachBatch()
from writestream with checkpoint
options enabled.
def loadToLocation(df, batch_id):
(
df
.repartition(1)
.write
.partitionBy('updateddate')
.format("orc")
.mode("append")
.save('{}/event/data'.format(adls_mount_path))
)
location_writestream = (
event_readstream # Same read stream is used here
.writeStream
.trigger(once=True)
.option("checkpointLocation", "{}/checkpoints/event".format(adls_mount_path))
.foreachBatch(loadToLocation)
.start()
)
Question:
In above point #2, instead of using the readStream (reading from orc file), create a new readStream using the Delta table path like below
deltatbl_event_readstream = spark.readStream.format("delta")
.load("/mnt/delta/myadlsaccnt/user_events") # my delta table location
def loadToLocation(df, batch_id):
(
df
.repartition(1)
.write
.partitionBy('updateddate')
.format("orc")
.mode("append")
.save('{}/event/data'.format(adls_mount_path))
)
deltatbl_event_readstream
.writeStream
.trigger(once=True)
.option("checkpointLocation", "{}/checkpoints/event".format(adls_mount_path))
.foreachBatch(loadToLocation) # re-using the same method.
.start()
Upvotes: 1
Views: 2907
Reputation: 1471
I came across the link DATA+AI summit which has Demo for such scenario.
In my case, each batch has >90% new row, less updates. So i can't use this option. This might help others.
Below is similar to Alex Ott answer, have added additional info
Per recommendation, if batch updates are more, CDF might not be effective.
%sql
ALTER table <table-name> SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
table_changes()
function to view the changes%sql
select * from table_changes('<table-name>',<start-commit-version>,<end-commit-version>
event_read_stream = spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "latest")
.table("event") #// table name
.filter("_change_type != 'update_preimage'")
event_read_stream.writeStream.format("delta")
.trigger(processingTime = "2 seconds") # if in case if job use once
.outputMode("append")
.start()
Upvotes: 0
Reputation: 87174
If you just use plain readStream
on Delta table without any options, then you won't get information about updates. In fact, the stream will fail after update until you set option ignoreChanges
. That's comes from the fact that Delta doesn't track changes, and when you make update/delete, it's rewriting existing files, so by looking into the file you just see the data, and don't know if it was insert or update.
But if you need to stream changes from Delta, then you can use Delta Change Data Feed (CDF) functionality introduced in Delta 8.4 (if I remember correctly). To make it working, you need to enable it on the source delta table by setting property delta.enableChangeDataFeed
to true
. And since that version, you'll able to read feed of changes, with something like this:
deltatbl_event_readstream = spark.readStream.format("delta")\
.option("readChangeFeed", "true") \
.option("startingVersion", <version_of_delta_when_you_enable_cdf>) \
.load("...")
this will add three additional columns that describe the operation performed, version of Delta, and timestamp. If you need to track only changes, you need to select only rows where the _change_type
column has value update_postimage
, and after that you can store that data in whatever you need.
But please take into account, that after you enable CDF on the table, the other clients (DBR < 8.4, OSS) won't be able to write into that table, although they will continue to read data.
Upvotes: 1