Ganesha
Ganesha

Reputation: 145

Eliminate duplicates (deduplication) in Streaming DataFrame

I have a Spark streaming processor. The Dataframe dfNewExceptions has duplicates (duplicate by "ExceptionId"). Since this is a streaming dataset, the below query fails:

val dfNewUniqueExceptions = dfNewExceptions.sort(desc("LastUpdateTime"))
                                    .coalesce(1)
                                    .dropDuplicates("ExceptionId")
                                    
val dfNewExceptionCore = dfNewUniqueExceptions.select("ExceptionId", "LastUpdateTime")
    dfNewExceptionCore.writeStream
      .format("console")
//      .outputMode("complete")
      .option("truncate", "false")
      .option("numRows",5000)
      .start()
      .awaitTermination(1000)
  

** Exception in thread "main" org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;; **

This is also documented here: https://home.apache.org/~pwendell/spark-nightly/spark-branch-2.0-docs/latest/structured-streaming-programming-guide.html

Any suggestions on how the duplicates can be removed from dfNewExceptions?

Upvotes: 3

Views: 8288

Answers (2)

Michael Heil
Michael Heil

Reputation: 18475

I recommend to follow the approach explained in the Structured Streaming Guide on Streaming Deduplication. There it says:

You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as de-duplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use de-duplication with or without watermarking.

With watermark - If there is an upper bound on how late a duplicate record may arrive, then you can define a watermark on an event time column and deduplicate using both the guid and the event time columns. The query will use the watermark to remove old state data from past records that are not expected to get any duplicates any more. This bounds the amount of the state the query has to maintain.

An example in Scala is also given:

val dfExceptions = spark.readStream. ... // columns: ExceptionId, LastUpdateTime, ... 

dfExceptions 
  .withWatermark("LastUpdateTime", "10 seconds") 
  .dropDuplicates("ExceptionId", "LastUpdateTime")

Upvotes: 3

Robert Kossendey
Robert Kossendey

Reputation: 6998

You can use watermarking to drop duplicates in a specific timeframe.

Upvotes: 0

Related Questions