Reputation: 501
I have a scenario where I have a topic_source which will have all generated messages by another application, these json/messages might be duplicated, so i need to deduplicate the messages `based on "window" size , say for every 10 sec , if there is any duplicates from topic_source , i will send deduplicate (based on message_id) messages to topic_target.
For the same i am using KStream, reading from topic_source , I am grouping by message_id using aggregation "count" , for each entry I am sending one message to topic_target.
Some thing like below
final KStream<String, Output> msgs = builder.stream("topic_source",Serdes.String());
final KTable<Windowed, Long> counts = clickEvents .groupByKey() .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) .count();
counts.toStream() .map((key, value) -> KeyValue.pair( key.key(), new Output(key.key(), value, key.window().start()))) .to("topic_target", Produced.with(Serdes.Integer(), new JsonSerde<>(Output.class)));
This is working fine in my local (windows standalone eclipse IDE ) machine ( when tested ).
But when I deploy the service/application on kubernatics pods , when I test , i found topic_target recieve as many meesages topic_source. ( no deduplication ) is happening.
I think , topic_source messages going/processed on different pods , where aggression of cumulative pods not resulting into single group by (message_id) set, i.e. each pod (group by of same message_id ) sending its own deduplicate messages to topic_target, where accumulated result result into duplicates.
Is there any way to solve this issue on kubernatics cluster ? i.e. is there any way all pods togther groupBy on one set , and send one distinct/deduplicated messages set to topic_target ?
This to achieve , what features of kubernatics/dockers should i use ? should there be any design machanisum/pattern I should follow ?
Any advice highly thankful.
Upvotes: 0
Views: 52
Reputation: 176
Who processes which messages depends on your partition assignment. Even if you have multiple pods KafkaStreams will allocate the same partitions to the same pods. So pod 1 will have partition 1 of input_topic, and partition 1 of whatever other topic your application is consuming.
Granted the specificity of your needs - which is possible to implement using standard operators - I'd probably implement this with processor API. It requires an extra changelog topic versus the repartition you'll need for grouping by key.
The processor code would look like something below:
public class DeduplicationTimedProcessor<Key, Value> implements Processor<Key, Value, Key, Value> {
private final String storeName;
private final long deduplicationOffset;
private ProcessorContext<Key, Value> context;
private KeyValueStore<Key, TimestampedValue<Value>> deduplicationStore;
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class TimestampedValue<Value> {
private long timestamp;
private Value value;
}
// Store needed for deduplication - means one changelog topic
public DeduplicationTimedProcessor(String storeName, long deduplicationOffset) {
this.storeName = storeName;
this.deduplicationOffset = deduplicationOffset;
}
@Override
public void init(ProcessorContext<Key, Value> context) {
Processor.super.init(context);
this.context = context;
this.deduplicationStore = context.getStateStore(storeName);
}
@Override
public void process(Record<Key, Value> record) {
var key = record.key();
var value = record.value();
var timestamp = context.currentSystemTimeMs(); // Uses System.currentTimeMillis() by default but easier for testing
var previousValue = deduplicationStore.get(key);
// New value - no deduplication - store + forward
if(previousValue == null) {
deduplicationStore.put(key, new TimestampedValue<>(timestamp, value));
context.forward(new Record<>(key, value, timestamp));
return;
}
// previous value exists - check if duplicate && in window
if(previousValue.equals(value) && timestamp - previousValue.timestamp < deduplicationOffset) {
// skip this message as duplicate within window
return;
}
deduplicationStore.put(key, new TimestampedValue<>(timestamp, value));
context.forward(new Record<>(key, value, timestamp));
}
}
Added a few comments for clarity in there.
Please be mindful that cleanup of the store rests with you, otherwise at some point you'll run out of disk space. Since you mentioned that your operation is for analytics I'd probably utilize a punctuator to routinely cleanup everything that is appropriately "old".
To use the processor use the process method (in older versions of KafkaStreams transform)
Upvotes: 2
Reputation: 372
There are two things that jump to mind:
application.id
properly for all of the pods? If the application.id
is different across pods, each of them will process all of the messages once. If it's the same, then the messages will be split between the pods.Upvotes: 1