Reputation: 74
I am looking for a way to consume some set of messages from my Kafka topic with specific offset range (assume my partition has offset from 200 - 300, I want to consume the messages from offset 250-270).
I am using below code where I can specify the initial offset, but it would consume all the messages from 250 to till end. Is there any way/attributes available to set the end offset to consume the messages till that point.
@KafkaListener(id = "KafkaListener",
topics = "${kafka.topic.name}",
containerFactory = "kafkaManualAckListenerContainerFactory",
errorHandler = "${kafka.error.handler}",
topicPartitions = @TopicPartition(topic = "${kafka.topic.name}",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "250"),
@PartitionOffset(partition = "1", initialOffset = "250")
}))
Upvotes: 2
Views: 8242
Reputation: 635
I would state that calling the poll(timeout) has the downside that it reads and waits for messages to be produced, until the poll limits are reached.
Example: Read the offset of the last message in the partition, seek to that position, then call poll(30s).
If the partition has 1000 more records meanwhile, the poll will return quickly. If the partition does not get any new messages, the poll will wait for 30 seconds for new data.
Setting max.poll.records=1 does not help.
The maximum number of records returned in a single call to poll(). Note, that max.poll.records does not impact the underlying fetching behavior.
The setting fetch.max.bytes=0 should do the trick, however.
Upvotes: 0
Reputation: 39930
You can use seek()
in order to force the consumer to start consuming from a specific offset and then poll()
until you reach the target end offset.
public void seek(TopicPartition partition, long offset)
Overrides the fetch offsets that the consumer will use on the next
poll(timeout)
. If this API is invoked for the same partition more than once, the latest offset will be used on the nextpoll()
. Note that you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
For example, let's assume you want to start from offset 200
:
TopicPartition tp = new TopicPartition("myTopic", 0);
Long startOffset = 200L
Long endOffset = 300L
List<TopicPartition> topics = Arrays.asList(tp);
consumer.assign(topics);
consumer.seek(topicPartition, startOffset);
now you just need to keep poll()
ing until endOffset
is reached:
boolean run = true;
while (run) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
// Do whatever you want to do with `record`
// Check if end offset has been reached
if (record.offset() == endOffset) {
run = false;
break;
}
}
}
Upvotes: 3
Reputation: 395
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
boolean keepOnReading = true;
// offset to read the data from.
long offsetToReadFrom = 250L;
// seek is mostly used to replay data or fetch a specific message
// seek
kafkaConsumer.seek(partitionToReadFrom, offsetToReadFrom);
while(keepOnReading) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
numberOfMessagesRead ++;
logger.info("Key: "+record.key() + ", Value: " + record.value());
logger.info("Partition: " + record.partition() + ", Offset: " + record.offset());
if(record.offset() == 270L) {
keepOnReading = false;
break;
}
}
}
I hope this helps you !!
Upvotes: 0