Reputation: 9028
I have been using Kafka8 and trying to move to kafka10.
We have a topic with 10 partitions and used to create a consumer group with 10 consumers as shown below.
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.execute(new ConsumerTest(stream, threadNumber));
Here, based on number of partitions we used to pass number of threads.
But, with kafka10 consumers not sure if there anything like that. Here it doesnt return streams based on partitions.
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "");
props.put("", "group-1");
props.put("", "true");
props.put("", "1000");
props.put("auto.offset.reset", "earliest");
props.put("", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, value = %s", record.offset(), record.value());
Thanks in Advance
Upvotes: 0
Views: 99
Reputation: 7089
The new consumer enables a simple and efficient implementation which can handle all IO from a single thread. That's quite different with the old consumer. See this blog for further details :
Upvotes: 2