Tim
Tim

Reputation: 1471

Databricks - readstream from delta table writestream to orc file only with changes

A pipeline runs every 20 minutes pushing data to ADLS Gen2 storage in ORC format.

  1. I have an Azure Databricks notebook job which runs every 1 hour. This job reads the orc file from ADLS as structured stream (orc file created by pipeline mentioned above), then uses the merge functionality to upsert data to delta table based on a primaryKey column.
event_readstream = ( 
    spark.readStream
    .format("orc")
    .schema('my-schema.json')
    .load('/mnt/path/from/adls/to/orc/files/')
  )
  ...

def upsertEventToDeltaTable(df, batch_id): 
  input_df = (
    df
    .drop('address_line3')
    .dropDuplicates(['primaryKey'])
    .withColumn(partition_column, F.date_format(F.col('updateddate'), 'yyyy').cast('int'))
  )
  
  input_df.createOrReplaceTempView("event_df_updates")
  
  input_df._jdf.sparkSession().sql("""
    MERGE INTO events dbEvent
    USING event_df_updates dfEvent
    ON dbEvent.primaryKey = dfEvent.primaryKey
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *                     
                             """)

event_writestream = ( 
    event_readStream
    .drop('adddress_line3')  #some column to be dropped
    .repartition(1)
    .writeStream
    .trigger(once=True)
    .format("delta")
    .option("checkpointLocation", "{}/checkpoints/{}".format(adls_mount_path,mytable))
    .foreachBatch(upsertEventToDeltaTable)
    .start()
  )
  1. The same notebook also uses the read stream (structured stream) and writes the data directly to a different location in ADLS Gen2 storage. This also uses the forEachBatch() from writestream with checkpoint options enabled.

def loadToLocation(df, batch_id): 
  (
    df
    .repartition(1)
    .write
    .partitionBy('updateddate')
    .format("orc")
    .mode("append")
    .save('{}/event/data'.format(adls_mount_path))  
  )

location_writestream = ( 
  event_readstream   # Same read stream is used here 
  .writeStream
  .trigger(once=True)
  .option("checkpointLocation", "{}/checkpoints/event".format(adls_mount_path))
  .foreachBatch(loadToLocation)
  .start()
)

Question:

In above point #2, instead of using the readStream (reading from orc file), create a new readStream using the Delta table path like below

deltatbl_event_readstream = spark.readStream.format("delta")
  .load("/mnt/delta/myadlsaccnt/user_events")  # my delta table location
def loadToLocation(df, batch_id): 
  (
    df
    .repartition(1)
    .write
    .partitionBy('updateddate')
    .format("orc")
    .mode("append")
    .save('{}/event/data'.format(adls_mount_path))  
  )

deltatbl_event_readstream
   .writeStream
  .trigger(once=True)
  .option("checkpointLocation", "{}/checkpoints/event".format(adls_mount_path))
  .foreachBatch(loadToLocation)  # re-using the same method.
  .start()

Upvotes: 1

Views: 2907

Answers (2)

Tim
Tim

Reputation: 1471

I came across the link DATA+AI summit which has Demo for such scenario.

In my case, each batch has >90% new row, less updates. So i can't use this option. This might help others.

Below is similar to Alex Ott answer, have added additional info

Per recommendation, if batch updates are more, CDF might not be effective.

  • To enable the CDF feature:
%sql
ALTER table <table-name> SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
  • Perform any update/insert operation the table
  • use table_changes() function to view the changes
%sql 
select * from table_changes('<table-name>',<start-commit-version>,<end-commit-version>
  • To read as stream
event_read_stream = spark.readStream
     .format("delta")
     .option("readChangeFeed", "true")
     .option("startingVersion", "latest")
     .table("event") #// table name 
     .filter("_change_type != 'update_preimage'")
  • Create the upsert function which merge the change
  • write stream to write the info
event_read_stream.writeStream.format("delta")
  .trigger(processingTime = "2 seconds") # if in case if job use once
  .outputMode("append")
  .start()

Upvotes: 0

Alex Ott
Alex Ott

Reputation: 87174

If you just use plain readStream on Delta table without any options, then you won't get information about updates. In fact, the stream will fail after update until you set option ignoreChanges. That's comes from the fact that Delta doesn't track changes, and when you make update/delete, it's rewriting existing files, so by looking into the file you just see the data, and don't know if it was insert or update.

But if you need to stream changes from Delta, then you can use Delta Change Data Feed (CDF) functionality introduced in Delta 8.4 (if I remember correctly). To make it working, you need to enable it on the source delta table by setting property delta.enableChangeDataFeed to true. And since that version, you'll able to read feed of changes, with something like this:

deltatbl_event_readstream = spark.readStream.format("delta")\
  .option("readChangeFeed", "true") \
  .option("startingVersion", <version_of_delta_when_you_enable_cdf>) \
  .load("...")

this will add three additional columns that describe the operation performed, version of Delta, and timestamp. If you need to track only changes, you need to select only rows where the _change_type column has value update_postimage, and after that you can store that data in whatever you need.

But please take into account, that after you enable CDF on the table, the other clients (DBR < 8.4, OSS) won't be able to write into that table, although they will continue to read data.

Upvotes: 1

Related Questions