Prateek Negi
Prateek Negi

Reputation: 280

Apache Kafka : commitSync after pause

In our code, we plan to manually commit the offset. Our processing of data is long run and hence we follow the pattern suggested before

  1. Read the records
  2. Process the records in its own thread
  3. pause the consumer
  4. continue polling paused consumer so that it is alive
  5. When the records are processed, commit the offsets
  6. When commit done, then resume the consumer

The code somewhat looks like this:

   while (true) { 
          ConsumerRecords<String, String> records = consumer.poll(kafkaConfig.getTopicPolling());
                if (!records.isEmpty()) {
                    task = pool.submit(new ProcessorTask(processor, createRecordsList(records)));
                }
                if (shouldPause(task)) {
                    consumer.pause(listener.getPartitions());
                }
                if (isDoneProcessing(task)) {
                   consumer.commitSync();
                   consumer.resume(listener.getPartitions());
                }
    }

If you notice, we commit using commitSync() (without any parameters). Since the consumer is paused, in the next iteration we would get no records. But commitSync() would happen later. In that case which offset's would it try to commit? I have read the definitive guide and googled but cannot find any information about it.

I think we should explicitly save the offsets. But I am not sure if the current code would be an issue.

Any information would be helpful.

Thanks, Prateek

Upvotes: 4

Views: 1558

Answers (1)

Hans Jespersen
Hans Jespersen

Reputation: 8335

If you call consumer.commitSync() with no parameters it should commit the latest offset that your consumer has received. Since you can receive many messages in a single poll() you might want to have finer control over the commit and explicitly commit a specific offset such as the latest message that your consumer has successfully processed. This can be done by calling commitSync(Map<TopicPartition,OffsetAndMetadata> offsets)

You can see the syntax for the two ways to call commitSync here in the Consumer Javadoc http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync()

Upvotes: 1

Related Questions