Reputation: 145
I have a Spark streaming processor. The Dataframe dfNewExceptions has duplicates (duplicate by "ExceptionId"). Since this is a streaming dataset, the below query fails:
val dfNewUniqueExceptions = dfNewExceptions.sort(desc("LastUpdateTime"))
.coalesce(1)
.dropDuplicates("ExceptionId")
val dfNewExceptionCore = dfNewUniqueExceptions.select("ExceptionId", "LastUpdateTime")
dfNewExceptionCore.writeStream
.format("console")
// .outputMode("complete")
.option("truncate", "false")
.option("numRows",5000)
.start()
.awaitTermination(1000)
** Exception in thread "main" org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;; **
This is also documented here: https://home.apache.org/~pwendell/spark-nightly/spark-branch-2.0-docs/latest/structured-streaming-programming-guide.html
Any suggestions on how the duplicates can be removed from dfNewExceptions?
Upvotes: 3
Views: 8288
Reputation: 18475
I recommend to follow the approach explained in the Structured Streaming Guide on Streaming Deduplication. There it says:
You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as de-duplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use de-duplication with or without watermarking.
With watermark - If there is an upper bound on how late a duplicate record may arrive, then you can define a watermark on an event time column and deduplicate using both the guid and the event time columns. The query will use the watermark to remove old state data from past records that are not expected to get any duplicates any more. This bounds the amount of the state the query has to maintain.
An example in Scala is also given:
val dfExceptions = spark.readStream. ... // columns: ExceptionId, LastUpdateTime, ...
dfExceptions
.withWatermark("LastUpdateTime", "10 seconds")
.dropDuplicates("ExceptionId", "LastUpdateTime")
Upvotes: 3
Reputation: 6998
You can use watermarking to drop duplicates in a specific timeframe.
Upvotes: 0