Reputation: 2603
We are using ConsumerSeekAware
and seekToBeginning
when developing the microservice consuming data from the feed. When we are ready, we would like that the consumer will use the offset. I was not really able to understand what seekToEnd
stands for, because of documentation. Does it mean, it will seek to the offset (the last ACKed message) or it will seek to the last message of the topic (regardless the offset) and waits for the new ones?
@Slf4j
public class KafkaSeekOffsetAwareListener implements ConsumerSeekAware {
protected final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
log.info("Registering seek callback");
this.seekCallBack.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
log.info("Incoming Topic Partitions {}", assignments);
----> assignments.forEach((assignment, assignmentKey) -> assignments.forEach((t, o) -> callback.seekToBeginning(t.topic(), t.partition())));
}
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
log.info("Container idle");
}
}
Upvotes: 0
Views: 3555
Reputation: 121560
See JavaDocs:
/**
* Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the
* final offset in all partitions only when {@link #poll(Duration)} or {@link #position(TopicPartition)} are called.
* If no partitions are provided, seek to the final offset for all of the currently assigned partitions.
* <p>
* If {@code isolation.level=read_committed}, the end offset will be the Last Stable Offset, i.e., the offset
* of the first message with an open transaction.
*
* @throws IllegalArgumentException if {@code partitions} is {@code null}
* @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
*/
@Override
public void seekToEnd(Collection<TopicPartition> partitions) {
As you see it is pure Apache Kafka client. Spring for Apache Kafka does nothing more than delegation to that functionality. I even would say it is wrong to say that Spring for Apache Kafka is a "client for Kafka". It is just a wrapper and high level Spring API around standard Apache Kafka client.
Upvotes: 1
Reputation: 174779
seekToEnd
means just that; set the position to the END of the partition, regardless of this consumer's "current" position; the next received record will be the next one that is added to the partition after the seek is performed.
Any records published while the consumer was stopped will be skipped.
Upvotes: 0
Reputation: 192013
I was not really able to understand what seekToEnd stands for, because of documentation. Does it mean, it will seek to the offset (the last ACKed message) or it will seek to the last message of the topic (regardless the offset) and waits for the new ones?
Those are the same thing. The last offset in a topic is the last acked (assuming producers even enabled acks}, and seeking to the end does move the "cursor" of consumption to the end regardless of the offsets
Upvotes: 0