koiralo
koiralo

Reputation: 23109

How to deduplicate messages while streaming kafka using Spark Streaming?

I have a case where Kafka producers sends the data twice a day. These producers read all the data from the database/files and send to Kafka. So these messages sent every day, which is duplicated. I need to deduplicate the message and write in some persistent storage using the Spark Streaming. What will the best way of removing the duplicate messages in this case?

The duplicate message sent is a json string with the timestamp field is only updated.

Note: I can't change Kafka Producer to send only the new data/message, it's already installed in the client machine and written by someone else.

Upvotes: 4

Views: 7771

Answers (6)

rogerthat
rogerthat

Reputation: 21

Have you looked into this: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#streaming-deduplication

You can try using the dropDuplicates() method. If you have more than one column that needs to be used to determine the duplicates, you can use the dropDuplicates(String[] colNames) to pass them.

Upvotes: 0

wandermonk
wandermonk

Reputation: 7356

You can use a Key-Value datastore where your key is going to be combination of fields excluding the timestamp field and value the actual json.

As you poll the records create the Key and value pair write to the datastore which either handles the UPSERT(Insert + Update) or check if the key exists in the datastore then drop the message

if(Datastore.get(key)){ 
     // then drop
 }else { 
    //write to the datastore
    Datastore.put(key)
}

I suggest you to check HBase(Which handles UPSERTS) and Redis(In-Memory KV datastore used for lookups)

Upvotes: 0

Akhil Bojedla
Akhil Bojedla

Reputation: 2218

A much simpler approach would be to solve this at kafka end. have a look at kafka's Log compaction feature. It will deduplicate the recors for you provided the records have same unique key.

https://kafka.apache.org/documentation/#compaction

Upvotes: 0

Eugene Lopatkin
Eugene Lopatkin

Reputation: 2737

You could try to use mapWithState. Check my answer.

Upvotes: 0

Kamal Chandraprakash
Kamal Chandraprakash

Reputation: 2002

You can the change the topic configuration to compact mode. With compaction, a record with same key will be overwritten/updated in the Kafka log. There by you get only the latest value for a key from Kafka.

You can read more about compaction here.

Upvotes: 1

Natalia
Natalia

Reputation: 4532

For deduplication, you need to store somewhere information about what was already processed (for example unique ids of messages).

To store messages you can use:

  1. spark checkpoints. Pros: out-of-the-box. Cons: if you update the source code of app, you need to clean checkpoints. As result, you will lose information. Solution can work, if the requirements for deduplication is not strict.

  2. any database. For example, if you running on hadoop env, you can use Hbase. For every message you do 'get' (check that it wasn't sent before), and mark in DB sent when it is really send.

Upvotes: 1

Related Questions