tsar2512
tsar2512

Reputation: 2994

Understanding Kafka Consumer API for Java

I want to understand the Kafka Receiving API. I have included a sample code that works.

  1. Why does Kafka consumerStreamMap.get(topic) for a single topic have a list of KafkaStream<> receivers?
  2. The current process seems to iterate through the KafkaStream<> List, and then iterate through messages. But KafkaReceiver is supposed to run for ever, so I would expect that the inner while to loop forever. This makes the List> redundant.
  3. A few examples also use consumerStreamMap.get(topic).get(0). Is this the right way to write a producer then?

        Map<String, Integer> topicMap = new HashMap<String, Integer>();
        // Define single thread for topic
        topicMap.put(topicName, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap);
        List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap.get(topic);
    
        for (final KafkaStream<byte[], byte[]> stream : streamList) 
        {
           ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
           while (consumerIte.hasNext()) 
           {
              counter++;
              String message = new String(consumerIte.next().message());
              String id = topic.hashCode() + "-" + date.getTime() + "-" + counter;
              System.out.println(message);
            }
          }
    

Upvotes: 1

Views: 1472

Answers (1)

Anatoly Deyneka
Anatoly Deyneka

Reputation: 1258

You can find answers in the kafka wiki: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

  1. consumerStreamMap is a map of (topic, list of KafkaStream) pairs. Number of streams depends on the following row in your code:

    topicMap.put(topicName, numberOfStreams);
    

if you provide more threads than there are partitions on the topic, some threads will never see a message. if you have more partitions than you have threads, some threads will receive data from multiple partitions. if you have multiple partitions per thread there is NO guarantee about the order you receive messages, other than that within the partition the offsets will be sequential. For example, you may receive 5 messages from partition 10 and 6 from partition 11, then 5 more from partition 10 followed by 5 more from partition 10 even if partition 11 has data available. adding more processes/threads will cause Kafka to re-balance, possibly changing the assignment of a Partition to a Thread.

  1. you need to iterate each stream in its own thread.

    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
    
        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);
    
        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }
    
    public class ConsumerTest implements Runnable {
        private KafkaStream m_stream;
        private int m_threadNumber;
    
        public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
            m_threadNumber = a_threadNumber;
            m_stream = a_stream;
        }
    
        public void run() {
            ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
            while (it.hasNext())
                System.out.println("Thread " + m_threadNumber + ": " + new      String(it.next().message()));
            System.out.println("Shutting down Thread: " + m_threadNumber);
        }
    }
    
  2. consumerStreamMap.get(topic).get(0) is correct only if you have 1 topic and 1 stream

Upvotes: 4

Related Questions