Ayush Pandey
Ayush Pandey

Reputation: 168

Kafka Consumer : controlled reading from topic

I have below kafka consumer code where 3 threads are reading from kafka Topic having 3 partitions.

Is there any way, where new message will be read from the kafka topic only after the messages currently being in process by the thread got processed.

For example lets say there are 100 messages in topic, so is there any way where only 3 messages should be read at a time and processed. Now when these 3 messages gets processed then only next 3 messages should be read and so on.

public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

// now launch all the threads
//
executor = Executors.newFixedThreadPool(3);

// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
    executor.submit(new ConsumerTest(stream, threadNumber));
    threadNumber++;
   }
}

Upvotes: 2

Views: 1508

Answers (2)

Kaushal
Kaushal

Reputation: 1359

If iterator inside ConsumerTest is processing message in synchronously then only 3 messages will be consumed at a time. enable.auto.commit is true by default. Make sure you do not set it to false, else you need to add logic for committing offset.

ex-

 ConsumerIterator<byte[], byte[]> streamIterator= stream.iterator(); 
 while (streamIterator.hasNext()) { 
   String kafkaMsg= new String(streamIterator.next().message()); 
 } 

Upvotes: 3

Nikolas
Nikolas

Reputation: 2452

Well, the consumers do not know about each other by default, so the can not "sync" their work. What you could to is either wrap your three messages into one (and thus guaranteeing they all will be answered in order) or maybe introduce more ("sub") topics.

Another possibility (if you really need to guarantee that your three messages will be consumed by individual consumers) might be that all your consumers sync their work or maybe notifying a controller which tracks your work.

But tbh it feels like you "are doing it wrong", actually the messages in a queue are meant to be stateless and only their order in a topic determines their "order in which they should be processed". WHEN the messages are being processed should not matter.

Upvotes: 2

Related Questions