Reputation: 595
@StreamListener(value = PersonStream.INPUT)
private void personBulkReceiver(List<Person> person) {
//....
}
spring:
cloud:
stream:
kafka:
binders:
bulkKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
configuration:
max.poll.records: 1500
fetch.min.bytes: 10000000
fetch.max.wait.ms: 10000
value.deserializer: tr.cloud.stream.examples.PersonDeserializer
bindings:
person-topic-in:
binder: bulkKafka
destination: person-topic
contentType: application/person
group : person-group
consumer:
batch-mode: true
I'am using Spring Cloud Stream with Kafka. In a StreamListener when partition count is 1 I can consume records in batch mode in every 5000 ms.
My .yml configuration is fetch.min.bytes = 10000000 && fetch.max.wait.ms = 50000 && max.poll.records = 1500 as stated above.
I can receive batch records in every 5000 ms. since batch record size doesn't exceed 10000000 bytes.
But when partition count is more than 1 StreamListener consumes records earlier than 5000 ms.
Is there any configuration for this case?
Or is this case is the natural result of independent threads working for each partition?
When partition count is more than 1 what is the difference in working logic ?
Upvotes: 1
Views: 494
Reputation: 174574
According to your readme...
And there is always a lot of data on the topic.
So that doesn't match your question where you said...
I can receive batch records in every 5000 ms. since batch record size doesn't exceed 10000000 bytes.
When there is more data than that, it will always be pushed to the client.
Consider using a Polled Consumer instead, to receive data at your desired rate.
Upvotes: 1