Reputation: 280
In our code, we plan to manually commit the offset. Our processing of data is long run and hence we follow the pattern suggested before
The code somewhat looks like this:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(kafkaConfig.getTopicPolling());
if (!records.isEmpty()) {
task = pool.submit(new ProcessorTask(processor, createRecordsList(records)));
}
if (shouldPause(task)) {
consumer.pause(listener.getPartitions());
}
if (isDoneProcessing(task)) {
consumer.commitSync();
consumer.resume(listener.getPartitions());
}
}
If you notice, we commit using commitSync() (without any parameters). Since the consumer is paused, in the next iteration we would get no records. But commitSync() would happen later. In that case which offset's would it try to commit? I have read the definitive guide and googled but cannot find any information about it.
I think we should explicitly save the offsets. But I am not sure if the current code would be an issue.
Any information would be helpful.
Thanks, Prateek
Upvotes: 4
Views: 1558
Reputation: 8335
If you call consumer.commitSync()
with no parameters it should commit the latest offset that your consumer has received. Since you can receive many messages in a single poll()
you might want to have finer control over the commit and explicitly commit a specific offset such as the latest message that your consumer has successfully processed. This can be done by calling commitSync(Map<TopicPartition,OffsetAndMetadata> offsets)
You can see the syntax for the two ways to call commitSync here in the Consumer Javadoc http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync()
Upvotes: 1