Reputation: 85
I have a single Kafka-Broker with multiple topics each having a single partition.
I have a consumer that works just fine consuming the messages from the topic
My problem is I need to improve the through put of the message queue by increasing the number of partitions, say I have four partitions on a topic, is there a way that i can write four consumers each pointed to individual partition on the topic???
import java.util.*;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class KafkaConsumer {
private ConsumerConnector consumerConnector = null;
private final String topic = "mytopic";
public void initialize() {
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "testgroup");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "300");
props.put("auto.commit.interval.ms", "1000");
ConsumerConfig conConfig = new ConsumerConfig(props);
consumerConnector = Consumer.createJavaConsumerConnector(conConfig);
}
public void consume() {
//Key = topic name, Value = No. of threads for topic
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(topic, new Integer(1));
//ConsumerConnector creates the message stream for each topic
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
consumerConnector.createMessageStreams(topicCount);
// Get Kafka stream for topic 'mytopic'
List<KafkaStream<byte[], byte[]>> kStreamList =
consumerStreams.get(topic);
// Iterate stream using ConsumerIterator
for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();
while (consumerIte.hasNext())
System.out.println("Message consumed from topic
[" + topic + "] : " +
new String(consumerIte.next().message()));
}
//Shutdown the consumer connector
if (consumerConnector != null) consumerConnector.shutdown();
}
public static void main(String[] args) throws InterruptedException {
KafkaConsumer kafkaConsumer = new KafkaConsumer();
// Configure Kafka consumer
kafkaConsumer.initialize();
// Start consumption
kafkaConsumer.consume();
}
}
Upvotes: 4
Views: 8136
Reputation: 165
Essentially, all you need to do is start several consumers that are all in the same consumer group. If you are using the new consumer from kafka 0.9 or later, or if you are using the high-level consumer, kafka will take care of dividing up the partitions making sure each partition is read by one consumer. If you have more partitions than consumers, then some consumers will receive messages from multiple partitions, but no partition will ever be read by more than one consumer from the same consumer group to ensure messages are not duplicated. So you never want more consumers than partitions, since some consumers will be idle. You can also fine tune which consumer reads each partition using the simple consumer https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
It seems you are using the old consumer from Kafka 0.8 or before. You may want to consider switching to the new consumer. http://kafka.apache.org/documentation.html#intro_consumers
Here is another good article with detailed examples of writing consumers using the new consumer: http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/
Upvotes: 3