Reputation: 23109
I have a case where Kafka producers sends the data twice a day. These producers read all the data from the database/files and send to Kafka. So these messages sent every day, which is duplicated. I need to deduplicate the message and write in some persistent storage using the Spark Streaming. What will the best way of removing the duplicate messages in this case?
The duplicate message sent is a json string with the timestamp field is only updated.
Note: I can't change Kafka Producer to send only the new data/message, it's already installed in the client machine and written by someone else.
Upvotes: 4
Views: 7771
Reputation: 21
Have you looked into this: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#streaming-deduplication
You can try using the dropDuplicates() method. If you have more than one column that needs to be used to determine the duplicates, you can use the dropDuplicates(String[] colNames) to pass them.
Upvotes: 0
Reputation: 7356
You can use a Key-Value datastore where your key is going to be combination of fields excluding the timestamp field and value the actual json.
As you poll the records create the Key and value pair write to the datastore which either handles the UPSERT(Insert + Update) or check if the key exists in the datastore then drop the message
if(Datastore.get(key)){
// then drop
}else {
//write to the datastore
Datastore.put(key)
}
I suggest you to check HBase(Which handles UPSERTS) and Redis(In-Memory KV datastore used for lookups)
Upvotes: 0
Reputation: 2218
A much simpler approach would be to solve this at kafka end. have a look at kafka's Log compaction feature. It will deduplicate the recors for you provided the records have same unique key.
https://kafka.apache.org/documentation/#compaction
Upvotes: 0
Reputation: 2002
You can the change the topic configuration to compact
mode. With compaction, a record with same key will be overwritten/updated in the Kafka log. There by you get only the latest value for a key from Kafka.
You can read more about compaction here.
Upvotes: 1
Reputation: 4532
For deduplication, you need to store somewhere information about what was already processed (for example unique ids of messages).
To store messages you can use:
spark checkpoints. Pros: out-of-the-box. Cons: if you update the source code of app, you need to clean checkpoints. As result, you will lose information. Solution can work, if the requirements for deduplication is not strict.
any database. For example, if you running on hadoop env, you can use Hbase. For every message you do 'get' (check that it wasn't sent before), and mark in DB sent when it is really send.
Upvotes: 1