rocky
rocky

Reputation: 5004

Kafka consumer group loses not commited messages

I am using consumer group with just one consumer, just one broker ( docker wurstmeister image ). It's decided in a code to commit offset or not - if code returns error then message is not commited. I need to ensure that system does not lose any message - even if that means retrying same msg forever ( for now ;) ). For testing this I have created simple handler which does not commit offset in case of 'error' string send as message to kafka. All other strings are commited.

kafka-console-producer --broker-list localhost:9092 --topic test
>this will be commited

Now running

kafka-run-class kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group michalgrupa --describe

returns

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
test                           0          13              13              0

so thats ok, there is no lag. Now we pass 'error' string to fake that something bad happened and message is not commited:

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
test                           0          13              14              1

Current offset stays at right position + there is 1 lagged message. Now if we pass correct message again offset will move on to 15:

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG test 0 15 15

and message number 14 will not be picked up ever again. Is it default behaviour? Do I need to trace last offset and load message by it+1 manually? I have set commit interval to 0 to hopefully not use any auto.commit mechanism.

fetch/commit code:

go func() {
    for {
        ctx := context.Background()

        m, err := mr.brokerReader.FetchMessage(ctx)
        if err != nil {
            break
        }

        if err := msgFunc(m); err != nil {
            log.Errorf("# messaging # cannot commit a message: %v", err)
            continue
        }

        // commit message if no error
        if err := mr.brokerReader.CommitMessages(ctx, m); err != nil {
            // should we do something else to just logging not committed message?
            log.Errorf("cannot commit message [%s] %v/%v: %s = %s; with error: %v", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value), err)
        }
    }
}()

reader configuration:

kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers:         brokers,
GroupID:         groupID,
Topic:           topic,
CommitInterval:  0,
MinBytes:        10e3,
MaxBytes:        10e6,
})

library used: https://github.com/segmentio/kafka-go

Upvotes: 2

Views: 5288

Answers (3)

Henry Yang
Henry Yang

Reputation: 61

Here make sense to understand the concept of consumer offset. For running consumer app, it stores the offset of consumed messages in memory regardless of commit/uncommit offset, if restarting the consumer app, it will retrieve the offset of 'CURRENT-OFFSET' to continue with the consumption.

Upvotes: 1

Andrzej Purtak
Andrzej Purtak

Reputation: 991

It looks like your Kafka consumer is set up to commit offsets automatically (that's the default setting).
If so, that's probably why your app skips over the erroneous message - despite the fact you skip CommitMessages invocation, commit is performed on a background thread
Please check out enable.auto.commit property specification in the docs: https://kafka.apache.org/documentation/#newconsumerconfigs

Upvotes: 0

TobiSH
TobiSH

Reputation: 2921

In kafka you just submit offsets not single messages. If I understand your code right (not a go-developer). You just continue after you hit an invalid message. If after in invalid message a valid one appears you will submit the offset again - I guess that was not your intention.

Just to make clear what does submitting or committing an offset means: Your consumer group will store the offset to a dedicated internal kafka topic (or on older kafka versions on zookeeper). An offset can identify a single position within a topic (or to be more precise on a partition of a given topic). This means you can only consume a topic in a linear fashion.

Here you can see what happens on kafka-consumer side:

New Kafka Consumer

You are consuming from a (most likely multiple) stack(s) of messages. You submit the position (a.k.a offset) at this topic/partition. So you can not say I want to reconsume a specific message again. What you can do is to stop consuming once you hit an invalid message. In this case your problem will be: How do I get rid of this message. Deleting a single message from a kafka topic is tricky. A common pattern is to write this messages to some kind of dead-letter topic and deal with it with a different consumer.

Hope that made things a little bit clearer to you.

Upvotes: 2

Related Questions