Reputation: 61
I have a kafka cluster. There is only one topic and to this topic 3 different consumer groups are taking the same messages from the topic and processing differently according to their own logic.
is there any problem with creating same topic for multiple consumer groups?
I am getting this doubt, as i am trying to implement exception topic and try to reprocess this messages. suppose, i have message "secret" in topic A. my all 3 consumer groups took the message "secret". 2 of my consumer groups successfully completed the processing of message. But for one of my consumer group failed to process the message.
so i kept the message in topic "failed_topic". I want to try to process this message for my failed consumer. But if i keep this message in my actual topic A, the other 2 consumer groups process this message second time.
Can some one please let me know how i can implement perfect reprocessing for this scenario ?
Upvotes: 2
Views: 3287
Reputation: 3579
First of all in Kafka each consumer group has its own offset for each topic-partition subscribed and these offsets are managed seperately by consumer groups. So failing in one consumer group doesn't affect other consumer groups.
You can check current offsets for a consumer group with this cli command:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
is there any problem with creating same topic for multiple consumer groups
No. There is no problem. Actually this is a normal behaivour of topic based publisher/subscriber pattern.
To implement re-processing logic there are some important points to consider:
max.poll.interval.ms
your consumer
will be considered dead and be revoked.max.poll.records
when you poll() again, for this time you will get
next group of messages. So for reprocessing failed messages you need
to call seek method.public void seek(TopicPartition partition, long offset) : Overrides the fetch offsets that the consumer will use on the next poll(timeout)
Let's try to write some pseudocode to implement re-process logic in case of exception by using these informations:
public void consumeLoop() {
while (true) {
currentRecord = consumer.poll(); //max.poll.records = 1
if (currentRecord != null) {
try {
processMessage(currentRecord);
} catch (Exception e) {
consumer.seek(new TopicPartition(currentRecord.topic(), currentRecord.partition()), currentRecord.offset());
continue;
}
consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(currentRecord.offset() + 1)));
}
}
}
Notes about the code:
Upvotes: 3
Reputation: 191983
is there any problem with creating same topic for multiple consumer groups?
Not at all
if i keep this message in my actual topic A, the other 2 consumer groups process this message second time.
Exactly, and you would create a loop (third group would fail, put it back, 2 accept it, third fails again, etc, etc)
Basically, you are asking about a "dead-letter queue" which would be a specific topic for each consumer group. Kafka can hold tens of thousands of topics, so this shouldn't be an issue in your use-case.
Upvotes: 0