scoder
scoder

Reputation: 2611

Kafka increase throughput with multiple partition and multiple consumer threads

I am using kafka stream for some application.

Stream flow is like below

kafkaProducer---->StreamerConsumer1->finalCosumer

I have producer which write the data very fast and my StreamConsumer will map each stream with some process and forward the stream to other topic.

in my StreamCosumer map, I added my own mapper function which Actually tries to persist its relevant data like below

public void checkRecord(T1 key, T2 value) {
 switch(T1.toString()){
 case "key1":
  //Get relavant fileds from value and perisit in db 
   break;
   case "key2":
     //Get relavant fileds from value and perisit in db 
   break;
 }
}


KStream<String, KafkaStatusRecordWrapper> pDStream[] = myStream.map(this::checkRecord).branch((key, value)-> value.isSuccess(),(key, value)-> !value.isSuccess());

pDStream[0].mapValues(value -> transformer(value)).to("other_topic",Produced.with(stringSerde, stringSerde));   

Now my checkRecord record consumer function is single threaded and almost it is taking 300ms(due to some business logic and db persist which I can not avoid) to return.

I can not increase the number of partition as there was some limitation from our infra, and also due to below constraints

More Partitions Requires More Open File Handles
More Partitions May Increase Unavailability
More Partitions May Increase End-to-end Latency

so I am planning to write multi-threaded stream consumer.

But I am concerned about below points.

  1. I need to process record only once
  2. Handing off to another thread will cause problems with offset management.

So how to increase throughput ?

I have enough resource on my consumer, only 40% of its resource are used.

Upvotes: 0

Views: 134

Answers (1)

Nishu Tayal
Nishu Tayal

Reputation: 20830

You can set the stream configuration num.stream.threads to configure the number of threads. Maximum value could be the maximum number of partitions. It helps to increase the parallelism of the application instance.

Let's say if your topic has 4 partitions, you can set following:

properties.set("num.stream.threads",4);

Upvotes: 2

Related Questions