PrabaharanKathiresan
PrabaharanKathiresan

Reputation: 1129

Kafka consumer offsetForTimes method returns only few partitions offsets position not all

I've one kafka topic with 8 partitions, subscribing the topic from single consumer and I've unique consumer group for the consumer. Now I tried to consume only the recent messages (in my case 3 mins before from current time) from all partitions. I used offsetForTimes method like below.

List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> topicPartions = partitionInfos.stream().......collect(Collectors.toList());
Long value = Instant.now().minus(120,ChronoUnit.SECONDS).toEpochMillis();
Map<TopicPartion,Long> topicPartitionTime = topicPartions.stream().collect(COllectors.toMap(tp -> tp,(value)));
Map<TopicPartition, OffsetAndTimeStamp> offsets = consumer.offsetsForTimes(topicPartitionTime);

now question is offsetsForTimes only returns one or two partitions offset positions and returns null for remaining.

I want to consume all partitions recent messages not one or two partitions.

I tried below also

consumer.unsubscribe();
consumer.assign(allPartitions);
Map<TopicPartition, OffsetAndTimeStamp> offsets = consumer.offsetsForTimes(topicPartitionTime);

but still getting only one or two offset positions.In worst case some times null offsets for all partitons.

if offsetForTimes works only with one/two partition, How to poll all partition recent records from single consumer ?

EDITED : I'm using Kafka cluster. 8 partitions shared on 3-4 machines.

Additional Inputs:- I am able to reproduce the problem with below scenario.

  1. Create three topics A (1-Partition), B(10-Partition) and C(10-Partition)
  2. KafkaStreams consuming message from A and pushing messages to B & C.
  3. Pushed some 100 Messages to A topic. KafkaStreams consumed and pushed to B&C topics. I can see Messages are spread over all partitions in B&C (ie. 10 partitions contains some 10 messages).
  4. I created single KafkaConsumer, Consuming B topic. Now I call offsetForTimes method with all partition and timestamp is 5 minutes minus from current.
  5. ensured consumer.assignment() returns all partitions before offsetForTimes.
  6. offsetForTimes returns single partition with offset position but when I call consumer.poll method it returns messages from other partitions too.

using apache kafka version - 2.11-2.2.0 Kafka clients jar - 2.0.1

Appreciate the help in advance.

Upvotes: 1

Views: 1878

Answers (1)

Gary Russell
Gary Russell

Reputation: 174514

I can't reproduce your condition; the only time I get null for the offset is when there is no committed offset for that partition. e.g. I have 10 partitions but only write to 8:

@SpringBootApplication
public class So59200574Application implements ConsumerSeekAware {

    public static void main(String[] args) {
        SpringApplication.run(So59200574Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so59200574").partitions(10).replicas(1).build();
    }

    @KafkaListener(id = "so59200574", topics = "so59200574")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ConsumerAwareRebalanceListener rebal() {
        return new ConsumerAwareRebalanceListener() {

            @Override
            public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
                final long tenSecondsAgo = System.currentTimeMillis() - 10_000L;
                partitions.forEach(tp -> timestampsToSearch.computeIfAbsent(tp, tp1 -> tenSecondsAgo));
                System.out.println(consumer.offsetsForTimes(timestampsToSearch));
            }

        };
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> IntStream.range(0, 8).forEach(i -> template.send("so59200574", i, null, "foo" + i));
    }

}

Upvotes: 0

Related Questions