Reputation: 243
I have a streaming data frame in spark reading from a kafka topic and I want to drop duplicates for the past 5 minutes every time a new record is parsed.
I am aware of the dropDuplicates(["uid"])
function, I am just not sure how to check for duplicates over a specific historic time interval.
My understanding is that the following:
df = df.dropDuplicates(["uid"])
either works on the data read over the current (micro)batch or else over "anything" that is right now into memory.
Is there a way to set the time for this de-duplication, using a "timestamp"
column within the data?
Thanks in advance.
Upvotes: 0
Views: 1384
Reputation: 4045
df\
.withWatermark("event_time", "5 seconds")\
.dropDuplicates(["User", "uid"])\
.groupBy("User")\
.count()\
.writeStream\
.queryName("pydeduplicated")\
.format("memory")\
.outputMode("complete")\
.start()
for more info you can refer, https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html
Upvotes: 1