romborimba
romborimba

Reputation: 243

Drop duplicates over time window in pyspark

I have a streaming data frame in spark reading from a kafka topic and I want to drop duplicates for the past 5 minutes every time a new record is parsed.

I am aware of the dropDuplicates(["uid"]) function, I am just not sure how to check for duplicates over a specific historic time interval.

My understanding is that the following:

df = df.dropDuplicates(["uid"])

either works on the data read over the current (micro)batch or else over "anything" that is right now into memory. Is there a way to set the time for this de-duplication, using a "timestamp" column within the data?

Thanks in advance.

Upvotes: 0

Views: 1384

Answers (1)

QuickSilver
QuickSilver

Reputation: 4045

df\
  .withWatermark("event_time", "5 seconds")\
  .dropDuplicates(["User", "uid"])\
  .groupBy("User")\
  .count()\
  .writeStream\
  .queryName("pydeduplicated")\
  .format("memory")\
  .outputMode("complete")\
  .start()

for more info you can refer, https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html

Upvotes: 1

Related Questions