khaja
khaja

Reputation: 61

How to handle exceptions and message reprocessing in apache kafka

I have a kafka cluster. There is only one topic and to this topic 3 different consumer groups are taking the same messages from the topic and processing differently according to their own logic.

is there any problem with creating same topic for multiple consumer groups?

I am getting this doubt, as i am trying to implement exception topic and try to reprocess this messages. suppose, i have message "secret" in topic A. my all 3 consumer groups took the message "secret". 2 of my consumer groups successfully completed the processing of message. But for one of my consumer group failed to process the message.

so i kept the message in topic "failed_topic". I want to try to process this message for my failed consumer. But if i keep this message in my actual topic A, the other 2 consumer groups process this message second time.

Can some one please let me know how i can implement perfect reprocessing for this scenario ?

Upvotes: 2

Views: 3287

Answers (2)

H.Ç.T
H.Ç.T

Reputation: 3579

First of all in Kafka each consumer group has its own offset for each topic-partition subscribed and these offsets are managed seperately by consumer groups. So failing in one consumer group doesn't affect other consumer groups.

You can check current offsets for a consumer group with this cli command:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

is there any problem with creating same topic for multiple consumer groups

No. There is no problem. Actually this is a normal behaivour of topic based publisher/subscriber pattern.


To implement re-processing logic there are some important points to consider:

  • You should keep calling poll() even you are re-processing same message. Otherwise after max.poll.interval.ms your consumer will be considered dead and be revoked.
  • By calling poll() you will get messages that your consumer group have not read yet. So when you poll() you will get messages up to max.poll.records when you poll() again, for this time you will get next group of messages. So for reprocessing failed messages you need to call seek method.

public void seek(TopicPartition partition, long offset) : Overrides the fetch offsets that the consumer will use on the next poll(timeout)

  • Ideally your number of consumers in consumer group should be equal to number of partitions of the topic subscribed. Kafka will take care of assigning partitions to consumers evenly. (one partition per consumer) But even this condition is satisfied at the very beginning, after some time a consumer may die and Kafka may assign more than one partitions to one consumer. This can lead some problems. Suppose that your consumer is responsible for two partitions, when you poll() you will get messages from both of these partitions and when a message cannot be consumed you should seek all of the partitions which is assigned (not just the one failed message comes from). Otherwise you may skip some messages.

Let's try to write some pseudocode to implement re-process logic in case of exception by using these informations:

public void consumeLoop() {
    while (true) {
        currentRecord = consumer.poll(); //max.poll.records = 1
        if (currentRecord != null) {
            try {
                processMessage(currentRecord);
            } catch (Exception e) {
                consumer.seek(new TopicPartition(currentRecord.topic(), currentRecord.partition()), currentRecord.offset());
                continue;
            }
            consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(currentRecord.offset() + 1)));
        }
    }
}

Notes about the code:

  • max.poll.records is set to one to make seek process simple.
  • In every exception we seek and poll to get same message again. (we have to poll to be considered alive by Kafka)
  • auto.commit is disabled

Upvotes: 3

OneCricketeer
OneCricketeer

Reputation: 191983

is there any problem with creating same topic for multiple consumer groups?

Not at all

if i keep this message in my actual topic A, the other 2 consumer groups process this message second time.

Exactly, and you would create a loop (third group would fail, put it back, 2 accept it, third fails again, etc, etc)

Basically, you are asking about a "dead-letter queue" which would be a specific topic for each consumer group. Kafka can hold tens of thousands of topics, so this shouldn't be an issue in your use-case.

Upvotes: 0

Related Questions