Reputation: 77
I am trying to implement a spring boot aws kinesis consumer that is capable of being auto-scaled in order to share the load (split processing shards) with the original instance.
What I have been able to do: using the well defined read me and examples available hereKinesis binder docs I have been able to start up multiple consumers that actually divide the shards for processing by supplying these properties.
on the producer, I supply partitionCount: 2 via an application property. and on the consumers, I supply both the instanceIndex and the instanceCount.
on consumer 1 i have instanceIndex=0 and instantCount =2 , on consumer 2 i have instanceIndex=1 and instantCount=2
this works fine and I have two spring boot applications dealing with their specific shards. But in this case, I have to have a pre-configured properties file per boot application that needs to be available upon load for them to split the load. and if I only start up the first consumer(non auto-scaled) I only process shards specific to index 0, leaving other shards unprocessed.
What I would like to do but not sure if it is possible is to have a single consumer deployed (that processes all shards). if I deploy another instance I would like that instance to relive the first consumer of some of the load, in other words, if I have 2 shards and one consumer it would process both, if I then deploy another app I would like that first consumer to now only processes from a single shard leaving the second shard to the second consumer.
I have tried to do this by not specifying instanceIndex or instanceCount on the consumers and only supplying the group name, but that leaves the second consumer idle until the first is shut down. FYI I have also created my own metadata and locking table, preventing the binder from creating the default ones.
Configurations: Producer -----------------
originator: KinesisProducer
server:
port: 8090
spring:
cloud:
stream:
bindings:
output:
destination: <stream-name>
content-type: application/json
producer:
headerMode: none
partitionKeyExpression: headers.type
consumers-------------------------------------
originator: KinesisSink
server:
port: 8091
spring:
cloud:
stream:
kinesis:
bindings:
input:
consumer:
listenerMode: batch
recordsLimit: 10
shardIteratorType: TRIM_HORIZON
binder:
checkpoint:
table: <checkpoint-table>
locks:
table: <locking-table
bindings:
input:
destination: <stream-name>
content-type: application/json
consumer:
concurrency: 1
listenerMode: batch
useNativeDecoding: true
recordsLimit: 10
idleBetweenPolls: 250
partitioned: true
group: mygroup
Upvotes: 5
Views: 4523
Reputation: 121552
That’s correct. That’s how it works for now: if one consumer is there, it takes all the shards for processing. The second one will take an action only if the first one is broken somehow for at least one shard.
The proper Kafka-like rebalancing is on our roadmap. We don’t have the solid vision yet, so issue on the matter and subsequent contribution are welcome!
Upvotes: 2