AKSHAY SHINGOTE
AKSHAY SHINGOTE

Reputation: 407

Handling duplicates while processing Streaming data in Databricks Delta table with Spark Structured Streaming?

I am using Spark Structured Streaming with Azure Databricks Delta where I am writing to Delta table (delta table name is raw).I am reading from Azure files where I am receiving out of order data and I have 2 columns in it "smtUidNr" and "msgTs".I am trying to handle duplicates by using Upsert in my code but when I query my delta table "raw". I see following duplicate records in my delta table

    smtUidNr                                 msgTs
    57A94ADA218547DC8AE2F3E7FB14339D    2019-08-26T08:58:46.000+0000
    57A94ADA218547DC8AE2F3E7FB14339D    2019-08-26T08:58:46.000+0000
    57A94ADA218547DC8AE2F3E7FB14339D    2019-08-26T08:58:46.000+0000

Following is my code:

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._


// merge duplicates
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {


  microBatchOutputDF.createOrReplaceTempView("updates")


  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO raw t
    USING updates s
    ON (s.smtUidNr = t.smtUidNr and s.msgTs>t.msgTs) 
    WHEN MATCHED THEN UPDATE SET * 
    WHEN NOT MATCHED THEN INSERT *
  """)
}


val df=spark.readStream.format("delta").load("abfss://[email protected]/entrypacket/")
df.createOrReplaceTempView("table1")
val entrypacket_DF=spark.sql("""SELECT details as dcl,invdetails as inv,eventdetails as evt,smtdetails as smt,msgHdr.msgTs,msgHdr.msgInfSrcCd FROM table1 LATERAL VIEW explode(dcl) dcl AS details LATERAL VIEW explode(inv) inv AS invdetails LATERAL VIEW explode(evt) evt as eventdetails LATERAL VIEW explode(smt) smt as smtdetails""").dropDuplicates()


entrypacket_DF.createOrReplaceTempView("ucdx")

//Here, we are adding a column date_timestamp which converts msgTs timestamp to YYYYMMDD format in column date_timestamp which eliminates duplicate for today & then we drop this column meaning which we are not tampering with msgTs column
val resultDF=spark.sql("select dcl.smtUidNr,dcl,inv,evt,smt,cast(msgTs as timestamp)msgTs,msgInfSrcCd from ucdx").withColumn("date_timestamp",to_date(col("msgTs"))).dropDuplicates(Seq("smtUidNr","date_timestamp")).drop("date_timestamp")

resultDF.createOrReplaceTempView("final_tab")

val finalDF=spark.sql("select distinct smtUidNr,max(dcl) as dcl,max(inv) as inv,max(evt) as evt,max(smt) as smt,max(msgTs) as msgTs,max(msgInfSrcCd) as msgInfSrcCd from final_tab group by smtUidNr")


finalDF.writeStream.format("delta").foreachBatch(upsertToDelta _).outputMode("update").start()

Structured Streaming does not support aggregation,window function & order by clause? What can I do to modify in my code so that I can have only 1 record of particular smtUidNr?

Upvotes: 0

Views: 5138

Answers (2)

Vignesh G
Vignesh G

Reputation: 151

The following snippet helps you find the latest record if there exist multiple rows of the same unique id. And also pick up only one row if multiple rows are exactly the same.

Let the unique key/keys with which you will filter your row/record be 'id'. You have a 'timestamp' column to find the latest record for the same id.

def upsertToDelta(micro_batch_df, batchId) :
   delta_table = DeltaTable.forName(spark, f'{database}.{table_name}')
   df = micro_batch_df.dropDuplicates(['id']) \
       .withColumn("r", rank().over(Window.partitionBy('id') \
       .orderBy(col('timestamp').desc()))).filter("r==1").drop("r")
   delta_table.alias("t") \
      .merge(df.alias("s"), 's.id = t.id') \
      .whenMatchedUpdateAll() \
      .whenNotMatchedInsertAll() \
      .execute()
final_df.writeStream \
  .foreachBatch(upsertToDelta) \
  .option('checkpointLocation', '/mnt/path/checkpoint') \
  .outputMode('update') \
  .start()

Upvotes: 0

Silvio
Silvio

Reputation: 4197

What you need to do is dedup in the foreachBatch method, so you're ensuring each batch merge is writing only a single value for each key.

In your example, you would do the following:

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {

  microBatchOutputDF
    .select('smtUidNr, struct('msgTs, 'dcl, 'inv, 'evt, 'smt, 'msgInfSrcCd).as("cols"))
    .groupBy('smtUidNr)
    .agg(max('cols).as("latest"))
    .select("smtUidNr", "latest.*")
    .createOrReplaceTempView("updates")

  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO raw t
    USING updates s
    ON (s.smtUidNr = t.smtUidNr and s.msgTs>t.msgTs) 
    WHEN MATCHED THEN UPDATE SET * 
    WHEN NOT MATCHED THEN INSERT *
  """)
}

finalDF.writeStream.foreachBatch(upsertToDelta _).outputMode("update").start()

You can see some more examples on the docs, here

Upvotes: 3

Related Questions