ram
ram

Reputation: 85

Kafka Consumer with JAVA

I have a single Kafka-Broker with multiple topics each having a single partition.

I have a consumer that works just fine consuming the messages from the topic

My problem is I need to improve the through put of the message queue by increasing the number of partitions, say I have four partitions on a topic, is there a way that i can write four consumers each pointed to individual partition on the topic???

import java.util.*;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer {
   private ConsumerConnector consumerConnector = null;
   private final String topic = "mytopic";

   public void initialize() {
         Properties props = new Properties();
         props.put("zookeeper.connect", "localhost:2181");
         props.put("group.id", "testgroup");
         props.put("zookeeper.session.timeout.ms", "400");
         props.put("zookeeper.sync.time.ms", "300");
         props.put("auto.commit.interval.ms", "1000");
         ConsumerConfig conConfig = new ConsumerConfig(props);
         consumerConnector = Consumer.createJavaConsumerConnector(conConfig);
   }

   public void consume() {
         //Key = topic name, Value = No. of threads for topic
         Map<String, Integer> topicCount = new HashMap<String, Integer>();       
         topicCount.put(topic, new Integer(1));

         //ConsumerConnector creates the message stream for each topic
         Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
               consumerConnector.createMessageStreams(topicCount);         

         // Get Kafka stream for topic 'mytopic'
         List<KafkaStream<byte[], byte[]>> kStreamList =
                                              consumerStreams.get(topic);
         // Iterate stream using ConsumerIterator
         for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
                ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();

                while (consumerIte.hasNext())
                       System.out.println("Message consumed from topic
                                     [" + topic + "] : "       +
                                       new String(consumerIte.next().message()));              
         }
         //Shutdown the consumer connector
         if (consumerConnector != null)   consumerConnector.shutdown();          
   }

   public static void main(String[] args) throws InterruptedException {
         KafkaConsumer kafkaConsumer = new KafkaConsumer();
         // Configure Kafka consumer
         kafkaConsumer.initialize();
         // Start consumption
         kafkaConsumer.consume();
   }

}

Upvotes: 4

Views: 8136

Answers (1)

Lev Stefanovich
Lev Stefanovich

Reputation: 165

Essentially, all you need to do is start several consumers that are all in the same consumer group. If you are using the new consumer from kafka 0.9 or later, or if you are using the high-level consumer, kafka will take care of dividing up the partitions making sure each partition is read by one consumer. If you have more partitions than consumers, then some consumers will receive messages from multiple partitions, but no partition will ever be read by more than one consumer from the same consumer group to ensure messages are not duplicated. So you never want more consumers than partitions, since some consumers will be idle. You can also fine tune which consumer reads each partition using the simple consumer https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

It seems you are using the old consumer from Kafka 0.8 or before. You may want to consider switching to the new consumer. http://kafka.apache.org/documentation.html#intro_consumers

Here is another good article with detailed examples of writing consumers using the new consumer: http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

Upvotes: 3

Related Questions