Reputation: 5004
I am using consumer group with just one consumer, just one broker ( docker wurstmeister image ). It's decided in a code to commit offset or not - if code returns error then message is not commited. I need to ensure that system does not lose any message - even if that means retrying same msg forever ( for now ;) ). For testing this I have created simple handler which does not commit offset in case of 'error' string send as message to kafka. All other strings are commited.
kafka-console-producer --broker-list localhost:9092 --topic test
>this will be commited
Now running
kafka-run-class kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group michalgrupa --describe
returns
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 13 13 0
so thats ok, there is no lag. Now we pass 'error' string to fake that something bad happened and message is not commited:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 13 14 1
Current offset stays at right position + there is 1 lagged message. Now if we pass correct message again offset will move on to 15:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
test 0 15 15
and message number 14 will not be picked up ever again. Is it default behaviour? Do I need to trace last offset and load message by it+1 manually? I have set commit interval to 0 to hopefully not use any auto.commit mechanism.
fetch/commit code:
go func() {
for {
ctx := context.Background()
m, err := mr.brokerReader.FetchMessage(ctx)
if err != nil {
break
}
if err := msgFunc(m); err != nil {
log.Errorf("# messaging # cannot commit a message: %v", err)
continue
}
// commit message if no error
if err := mr.brokerReader.CommitMessages(ctx, m); err != nil {
// should we do something else to just logging not committed message?
log.Errorf("cannot commit message [%s] %v/%v: %s = %s; with error: %v", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value), err)
}
}
}()
reader configuration:
kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
Topic: topic,
CommitInterval: 0,
MinBytes: 10e3,
MaxBytes: 10e6,
})
library used: https://github.com/segmentio/kafka-go
Upvotes: 2
Views: 5288
Reputation: 61
Here make sense to understand the concept of consumer offset. For running consumer app, it stores the offset of consumed messages in memory regardless of commit/uncommit offset, if restarting the consumer app, it will retrieve the offset of 'CURRENT-OFFSET' to continue with the consumption.
Upvotes: 1
Reputation: 991
It looks like your Kafka consumer is set up to commit offsets automatically (that's the default setting).
If so, that's probably why your app skips over the erroneous message - despite the fact you skip CommitMessages
invocation, commit is performed on a background thread
Please check out enable.auto.commit
property specification in the docs: https://kafka.apache.org/documentation/#newconsumerconfigs
Upvotes: 0
Reputation: 2921
In kafka you just submit offsets not single messages. If I understand your code right (not a go-developer). You just continue after you hit an invalid message. If after in invalid message a valid one appears you will submit the offset again - I guess that was not your intention.
Just to make clear what does submitting or committing an offset means: Your consumer group will store the offset to a dedicated internal kafka topic (or on older kafka versions on zookeeper). An offset can identify a single position within a topic (or to be more precise on a partition of a given topic). This means you can only consume a topic in a linear fashion.
Here you can see what happens on kafka-consumer side:
You are consuming from a (most likely multiple) stack(s) of messages. You submit the position (a.k.a offset) at this topic/partition. So you can not say I want to reconsume a specific message again. What you can do is to stop consuming once you hit an invalid message. In this case your problem will be: How do I get rid of this message. Deleting a single message from a kafka topic is tricky. A common pattern is to write this messages to some kind of dead-letter topic and deal with it with a different consumer.
Hope that made things a little bit clearer to you.
Upvotes: 2