santanu mohanty
santanu mohanty

Reputation: 93

scala Spark structured streaming Receiving duplicate message

I am using Kafka and Spark 2.4.5 Structured Streaming.I am doing the average operation.but i am facing issues due to getting duplicate records from the Kafka topic in a current batch.

For example ,Kafka topic message received on 1st batch batch on update mode

car,Brand=Honda,speed=110,1588569015000000000
car,Brand=ford,speed=90,1588569015000000000
car,Brand=Honda,speed=80,15885690150000000000

here the result is average on car brand per timestamp
i.e groupby on  1588569015000000000 and Brand=Honda , the result we got 
110+90/2 = 100

now second message received late data with the duplicate message with same timestamp
car,Brand=Honda,speed=50,1588569015000000000
car,Brand=Honda,speed=50,1588569015000000000

i am expecting average should update to 110+90+50/3 = 83.33
but result update to 110+90+50+50/4=75,which is wrong
val rawDataStream: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", "topic1") // Both topics on same stream!
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING) as data")

group by timestamp and brand

write to kafka with checkpoint

How to use Spark Structured Streaming to do this or anything wrong on code?

Upvotes: 1

Views: 2130

Answers (1)

conetfun
conetfun

Reputation: 1615

Spark Structured Streaming allows deduplication on a streaming dataframe using dropDuplicates. You would need to specify fields to identify a duplicate record and across batches, spark will consider only the first record per combination and records with duplicate values will be discarded.

Below snippet will deduplicate your streaming dataframe on Brand, Speed and timestamp combination.

rawDataStream.dropDuplicates("Brand", "speed", "timestamp")

Refer to spark documentation here

Upvotes: 1

Related Questions