Reputation: 19208
I am doing something like the following pseudo code
var consumer = new KafkaConsumer();
consumer.assign(topicPartitions);
var beginOff = consumer.beginningOffsets(topicPartitions);
var endOff = consumer.endOffsets(topicPartitions);
var lastOffsets = Math.max(beginOff, endOff - 1));
lastOffsets.forEach(consumer::seek);
lastMessages = consumer.poll(1 sec);
// do something with the received messages
consumer.close();
In the simple test that I did, this works, but I wonder if there are cases, like producer crashes etc., where offsets are not monotonically increasing by one? In that case, would I have to seek()
my way back in time, or can I get the message offset of the last already produced message from Kafka?
I am not using transactions, so we don't need to worry about read-committed vs. uncommitted messages.
Edit: An example where offsets are not consecutive is after log compaction. However, log compaction should always keep the last message, as it is - obviously - more recent than all preceding messages (same key or not). But the offset before that last message could theoretically have been compacted away.
Upvotes: 1
Views: 1164
Reputation: 4095
In kafka.apache.org/10/javadoc/, it is clearly mentioned that, consumer.endOffsets
Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
So when you get that endOff - 1
, it is the last available Kafka record for that topic partition when you fetched that. So producer concerns are not impacted for this.
And one more thing, Offset is not decided by the producer. It is decided by the partition leader of that topic partition. So, it is always monotonically increasing by one.
Upvotes: 1