Reputation: 2611
I am using kafka stream for some application.
Stream flow is like below
kafkaProducer---->StreamerConsumer1->finalCosumer
I have producer which write the data very fast and my StreamConsumer will map each stream with some process and forward the stream to other topic.
in my StreamCosumer map, I added my own mapper function which Actually tries to persist its relevant data like below
public void checkRecord(T1 key, T2 value) {
switch(T1.toString()){
case "key1":
//Get relavant fileds from value and perisit in db
break;
case "key2":
//Get relavant fileds from value and perisit in db
break;
}
}
KStream<String, KafkaStatusRecordWrapper> pDStream[] = myStream.map(this::checkRecord).branch((key, value)-> value.isSuccess(),(key, value)-> !value.isSuccess());
pDStream[0].mapValues(value -> transformer(value)).to("other_topic",Produced.with(stringSerde, stringSerde));
Now my checkRecord record consumer function is single threaded and almost it is taking 300ms(due to some business logic and db persist which I can not avoid) to return.
I can not increase the number of partition as there was some limitation from our infra, and also due to below constraints
More Partitions Requires More Open File Handles
More Partitions May Increase Unavailability
More Partitions May Increase End-to-end Latency
so I am planning to write multi-threaded stream consumer.
But I am concerned about below points.
So how to increase throughput ?
I have enough resource on my consumer, only 40% of its resource are used.
Upvotes: 0
Views: 134
Reputation: 20830
You can set the stream configuration num.stream.threads
to configure the number of threads. Maximum value could be the maximum number of partitions. It helps to increase the parallelism of the application instance.
Let's say if your topic has 4 partitions, you can set following:
properties.set("num.stream.threads",4);
Upvotes: 2