Reputation: 1153
I have a single kafka consumer which is connected to a topic with 3 partitions. As soon as I get a record from kafka, I would like to capture the offset and partition. On restart I would like to restore the position of the consumer from the last read offset
From kafka documentation:
Each record comes with its own offset, so to manage your own offset you just need to do the following:
Configure enable.auto.commit=false
Use the offset provided with each ConsumerRecord to save your position.
On restart restore the position of the consumer using seek (TopicPartition, long).
Here is my sample code:
constructor{
load data into offsetMap<partition,offset>
initFlag=true;
}
Main method
{
ConsumerRecords<String, String> records = consumer.poll(100);
if(initFlag) // is this correct way to override offset position?
{
seekToPositions(offsetMap);
initFlag=false;
}
while(!shutdown)
{
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
getOffsetPositions();// dump offsets and partitions to db/disk
}
}
}
//get current offset and write to a file
public synchronized Map<Integer, Long> getOffsetPositions() throws Exception{
Map<Integer, Long> offsetMap = new HashMap<Integer, Long>();
//code to put partition and offset into map
//write to disk or db
}
} // Overrides the fetch offsets that the consumer
public synchronized void seekToPositions(Map<Integer, Long> offsetMap) {
//code get partitions and offset from offsetMap
consumer.seek(partition, offset);
}
Is this the right way to do? is there any better way?
Upvotes: 4
Views: 11068
Reputation: 2029
This can be solved by having control over we committing offsets.
Fist thing to do is to turn off the config 'enable.auto.commit' to 'false' at the consumer application, so you have the control over when to commit the offset.
We use Map to manually track offsets as shown below :
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
consumer.subscribe(topic, new CommitCurrentOffset());
try {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// process the record (ex : save in DB / call external service etc..)
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, null)); // 1
}
consumer.commitAsync(currentOffsets, null); // 2
}
finally {
consumer.commitSync(currentOffsets); // 3
}
class CommitCurrentOffset implements ConsumerRebalanceListener { // 4
public void onPartitionRevoked(Collection<TopicPartition> topicPartitions) {
consumer.commitSync(currentOffsets);
consumer.close();
}
}
As we process each message we add the offset of the message processed in our map as below :
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, null));
We commit the offset of the message processed Asynchronously to the broker.
In case of any error/exception while processing the message, we commit the offsets of the latest message that was processed for each partition.
When we are about to lose a partition due to rebalancing, we need to commit offsets. Here, we are committing the latest offsets that we have processed ( In for each loop), not the latest offsets in the batch we are still processing. We achieve this by implementing ConsumerRebalanceListener interface. Whenever a rebalance is triggered, onPartitionRevoked() method will be invoked before rebalancing starts and after consumer stops processing the messages.
Upvotes: 0
Reputation: 8335
If you commit your offsets Kafka will store them for you (for up to 24 hours by default).
That way if your consumer dies you could start the same code on another machine and continue right from where you left off. No external storage needed.
See "Offsets and Consumer Position" in https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
and recommend you consider to use commitSync
Upvotes: 4
Reputation: 829
That's ok to me, just be careful on how your consumer is build (manual partition assignation or automatic)
If the partition assignment is done automatically special care is needed to handle the case where partition assignments change. This can be done by providing a ConsumerRebalanceListener instance in the call to subscribe(Collection, ConsumerRebalanceListener) and subscribe(Pattern, ConsumerRebalanceListener). For example, when partitions are taken from a consumer the consumer will want to commit its offset for those partitions by implementing ConsumerRebalanceListener.onPartitionsRevoked(Collection). When partitions are assigned to a consumer, the consumer will want to look up the offset for those new partitions and correctly initialize the consumer to that position by implementing ConsumerRebalanceListener.onPartitionsAssigned(Collection).
Upvotes: 0