Reputation: 21245
If I have a enable.auto.commit=false
and I call consumer.poll()
without calling consumer.commitAsync()
after, why does consumer.poll()
return
new records the next time it's called?
Since I did not commit my offset, I would expect poll()
would return the latest offset which should be the same records again.
I'm asking because I'm trying to handle failure scenarios during my processing. I was hoping without committing the offset, the poll()
would return the same records again so I can re-process those failed records again.
public class MyConsumer implements Runnable {
@Override
public void run() {
while (true) {
ConsumerRecords<String, LogLine> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord record : records) {
try {
//process record
consumer.commitAsync();
} catch (Exception e) {
}
/**
If exception happens above, I was expecting poll to return new records so I can re-process the record that caused the exception.
**/
}
}
}
}
Upvotes: 22
Views: 8477
Reputation: 1819
I would like to share some code how you can solve this in Java code.
The approach is that you poll the records, try to process them and if an exception occurs, you seek to the minima of the topic partitions. After that, you do the commitAsync()
.
public class MyConsumer implements Runnable {
@Override
public void run() {
while (true) {
List<ConsumerRecord<String, LogLine>> records = StreamSupport
.stream( consumer.poll(Long.MAX_VALUE).spliterator(), true )
.collect( Collectors.toList() );
boolean exceptionRaised = false;
for (ConsumerRecord<String, LogLine> record : records) {
try {
// process record
} catch (Exception e) {
exceptionRaised = true;
break;
}
}
if( exceptionRaised ) {
Map<TopicPartition, Long> offsetMinimumForTopicAndPartition = records
.stream()
.collect( Collectors.toMap( r -> new TopicPartition( r.topic(), r.partition() ),
ConsumerRecord::offset,
Math::min
) );
for( Map.Entry<TopicPartition, Long> entry : offsetMinimumForTopicAndPartition.entrySet() ) {
consumer.seek( entry.getKey(), entry.getValue() );
}
}
consumer.commitAsync();
}
}
}
With this setup, you poll the messages again and again until you successfully process all messages of one poll.
Please note, that your code should be able to handle a poison pill. Otherwise, your code will stuck in an endless loop.
Upvotes: 3
Reputation: 31
Consumer will read from last commit offset if it get re balanced (means if any consumer leave the group or new consumer added) so handling de-duplication does not come straight forward in kafka so you have to store the last process offset in external store and when rebalance happens or app restart you should seek to that offset and start processing or you should check against some unique key in message against DB to find is dublicate
Upvotes: 1
Reputation: 2145
The starting offset of a poll is not decided by the broker but by the consumer. The consumer tracks the last received offset and asks for the following bunch of messages during the next poll.
Offset commits come into play when a consumer stops or fails and another instance that is not aware of the last consumed offset picks up consumption of a partition.
KafkaConsumer has pretty extensive Javadoc that is well worth a read.
Upvotes: 16