Reputation: 2994
I want to understand the Kafka Receiving API. I have included a sample code that works.
A few examples also use consumerStreamMap.get(topic).get(0). Is this the right way to write a producer then?
Map<String, Integer> topicMap = new HashMap<String, Integer>();
// Define single thread for topic
topicMap.put(topicName, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap);
List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap.get(topic);
for (final KafkaStream<byte[], byte[]> stream : streamList)
{
ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
while (consumerIte.hasNext())
{
counter++;
String message = new String(consumerIte.next().message());
String id = topic.hashCode() + "-" + date.getTime() + "-" + counter;
System.out.println(message);
}
}
Upvotes: 1
Views: 1472
Reputation: 1258
You can find answers in the kafka wiki: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
consumerStreamMap is a map of (topic, list of KafkaStream) pairs. Number of streams depends on the following row in your code:
topicMap.put(topicName, numberOfStreams);
if you provide more threads than there are partitions on the topic, some threads will never see a message. if you have more partitions than you have threads, some threads will receive data from multiple partitions. if you have multiple partitions per thread there is NO guarantee about the order you receive messages, other than that within the partition the offsets will be sequential. For example, you may receive 5 messages from partition 10 and 6 from partition 11, then 5 more from partition 10 followed by 5 more from partition 10 even if partition 11 has data available. adding more processes/threads will cause Kafka to re-balance, possibly changing the assignment of a Partition to a Thread.
you need to iterate each stream in its own thread.
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
public class ConsumerTest implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext())
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
System.out.println("Shutting down Thread: " + m_threadNumber);
}
}
consumerStreamMap.get(topic).get(0) is correct only if you have 1 topic and 1 stream
Upvotes: 4