SmartTechie
SmartTechie

Reputation: 145

How to consume messages from multiple topics?

I am trying to consume messages from multiple topics using assign() method. With my implementation, some times I am able to consume messages from all topics and other times from only one topic. After some research, I found that Kafka by default uses Range assigner. Hence it will not assign all partitions always.

For my use case, I should able to consume from all topics and partitions.

I have tried by setting RoundRobin assigner. But it didn't help

List<TopicPartition> topicPartitions = new ArrayList<>();
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(consumerConfig);
for (String topic : topics) {
   topicPartitions.add(new TopicPartition(topic, 0);
}
kafkaConsumer.assign(topicPartitions);
ConsumerRecords<String, String> records = kafkaConsumer.poll(600);`

Upvotes: 0

Views: 2561

Answers (2)

pgras
pgras

Reputation: 12770

KafkaConsumer.assign is usually used for complex use-cases where you want to control not only the topics but also the partitions you consume. If you simply want to consume from several topics (and all their partitions) you should use KafkaConsumer.subscribe.

consumer.subscribe(Arrays.asList("topic1", "topic2"));

Look at the javadoc javadoc which also shows code examples.

EDIT: if you need to control partition assignment, then you indeed need to use the assign() method, but in your (incomplete) code example it looks like you assign partition 0 of each topic; so you will only consume messages from partition 0.

If you need to control offset manually you still can use subscribe, but you may disable auto-commit and use seek() and commitSync() or commitAsync() to control offset.

Upvotes: 2

Giorgos Myrianthous
Giorgos Myrianthous

Reputation: 39790

The following should do the trick for you:

consumer.subscribe(Arrays.asList("mytopic1","mytopic2"), ConsumerRebalanceListener obj)
// or consumer.subscribe(Arrays.asList(topic1,topic2), new ConsumerRebalanceListener ..)

ConsumerRecords<String, String> records = consumer.poll(600);
for (TopicPartition partition : records.partitions()) {
    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
    for (ConsumerRecord<String, String> record : partitionRecords) {
        System.out.println(record.offset() + " -> " + record.value());
    }
}

Upvotes: 0

Related Questions