Reputation: 1129
I've one kafka topic with 8 partitions, subscribing the topic from single consumer and I've unique consumer group for the consumer. Now I tried to consume only the recent messages (in my case 3 mins before from current time) from all partitions. I used offsetForTimes method like below.
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> topicPartions =;
Long value =,ChronoUnit.SECONDS).toEpochMillis();
Map<TopicPartion,Long> topicPartitionTime = -> tp,(value)));
Map<TopicPartition, OffsetAndTimeStamp> offsets = consumer.offsetsForTimes(topicPartitionTime);
now question is offsetsForTimes only returns one or two partitions offset positions and returns null for remaining.
I want to consume all partitions recent messages not one or two partitions.
I tried below also
Map<TopicPartition, OffsetAndTimeStamp> offsets = consumer.offsetsForTimes(topicPartitionTime);
but still getting only one or two offset positions.In worst case some times null offsets for all partitons.
if offsetForTimes works only with one/two partition, How to poll all partition recent records from single consumer ?
EDITED : I'm using Kafka cluster. 8 partitions shared on 3-4 machines.
Additional Inputs:- I am able to reproduce the problem with below scenario.
using apache kafka version - 2.11-2.2.0 Kafka clients jar - 2.0.1
Appreciate the help in advance.
Upvotes: 1
Views: 1878
Reputation: 174514
I can't reproduce your condition; the only time I get null
for the offset is when there is no committed offset for that partition. e.g. I have 10 partitions but only write to 8:
public class So59200574Application implements ConsumerSeekAware {
public static void main(String[] args) {, args);
public NewTopic topic() {
@KafkaListener(id = "so59200574", topics = "so59200574")
public void listen(String in) {
public ConsumerAwareRebalanceListener rebal() {
return new ConsumerAwareRebalanceListener() {
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
final long tenSecondsAgo = System.currentTimeMillis() - 10_000L;
partitions.forEach(tp -> timestampsToSearch.computeIfAbsent(tp, tp1 -> tenSecondsAgo));
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> IntStream.range(0, 8).forEach(i -> template.send("so59200574", i, null, "foo" + i));
Upvotes: 0