Reputation: 9044
My configuration for the consumer is as documented in Spring cloud stream consumer properties documentation.
spring-cloud-dependencies:Finchley.SR1
springBootVersion = '2.0.5.RELEASE'
I have 4 partitions for kstream_test
topic and they are filled with messages from producer as seen below:
root@kafka:/# kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic kstream_test --time -1
kstream_test:2:222
kstream_test:1:203
kstream_test:3:188
kstream_test:0:278
My spring cloud stream kafka binder based configuration is:
spring.cloud.stream.bindings.input:
destination: kstream_test
group: consumer-group-G1_test
consumer:
useNativeDecoding: true
headerMode: raw
startOffset: latest
partitioned: true
concurrency: 3
KStream Listener class
@StreamListener
@SendTo(MessagingStreams.OUTPUT)
public KStream<?, ?> process(@Input(MessagingStreams.INPUT) KStream<?, ?> kstreams) {
......
log.info("Got a message");
......
return kstreams;
}
My producer sends 100 messages in 1 run. But the logs seems to have only 1 thread StreamThread-1
handling the messages, though I have concurrency as 3. What might be wrong here ? Is 100 messages not enough to see the concurrency at play ?
2018-10-18 11:50:01.923 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.923 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.945 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.956 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
2018-10-18 11:50:01.972 INFO 10228 --- [-StreamThread-1] c.c.c.s.KStreamHandler : Got a message
UPDATE:
As per the answer, the below num.stream.threads
configuration works at the binder level.
spring.cloud.stream.kafka.streams.binder.configuration:
num.stream.threads: 3
Upvotes: 1
Views: 1838
Reputation: 174554
It seems that the num.stream.threads
needs to be set to increase the concurrency...
/** {@code num.stream.threads} */
@SuppressWarnings("WeakerAccess")
public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
...it defaults to 1.
The binder should really set that based on the ...consumer.concurrency
property; please open a github issue to that effect against the binder.
In the meantime, you can just set that property directly in ...consumer.configuration
.
CORRECTION
I've just been told that the ...consumer.configuration
is not currently applied to the streams binder either; you would have to set it at the binder level.
Upvotes: 2