Patan
Patan

Reputation: 17883

Spring cloud aws stream, messages are consumed by multiple instances in consumer group

I have written a sample application with spring cloud aws binder

  compile('org.springframework.cloud:spring-cloud-starter-stream-kinesis:1.0.0.BUILD-SNAPSHOT')

Code

@StreamListener(Processor.INPUT)
public void receive(Message<String> message) {      
    System.out.println("Message recieved: "+message);
    System.out.println("Message Payload: "+message.getPayload());    

}

application.yml

spring:
  cloud:
    stream:
      bindings:
        input: 
          group: group
          destination: stream
          content-type: application/json          
        output:  
          group: group
          destination: stream
          content-type: application/json

I have started the application on multiple ports

8081,8082,8083, 8084.

When I publish the message to stream, most of the times more than one instance is consuming message.

For example I sent message {"22":"11"}, this has been consumed by both 8083 and 8084

message on application:8084

2018-03-16 12:29:19.715  INFO 10084 --- [           main] a.i.k.KinesisMessageDrivenChannelAdapter : started KinesisMessageDrivenChannelAdapter{shardOffsets=[KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}], consumerGroup='group'}
2018-03-16 12:29:19.809  INFO 10084 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8084 (http) with context path ''
2018-03-16 12:29:19.809  INFO 10084 --- [           main] com.example.aws.AwsApplication           : Started AwsApplication in 21.034 seconds (JVM running for 22.975)
2018-03-16 12:29:19.840  INFO 10084 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}, state=NEW}] has been started.
2018-03-16 12:30:23.929  INFO 10084 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The records '[{SequenceNumber: 49582549849562056887358041088912873574803531055853731842,ApproximateArrivalTimestamp: Fri Mar 16 12:30:21 IST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=47 cap=47],PartitionKey: partitionKey-0,}]' are skipped from processing because their sequence numbers are less than already checkpointed: 49582549849562056887358041088912873574803531055853731842
Message recieved: GenericMessage [payload={"22":"11"}, headers={aws_shard=shardId-000000000000, id=f6cb4b6d-e149-059f-7e4d-aa9dfeeef10e, contentType=application/json, aws_receivedStream=stream, aws_receivedPartitionKey=partitionKey-0, aws_receivedSequenceNumber=49582549849562056887358041088914082500623155992949948418, timestamp=1521183774995}]
Message Payload: {"22":"11"}

message on application:8083

018-03-16 12:29:05.733  INFO 8188 --- [           main] a.i.k.KinesisMessageDrivenChannelAdapter : started KinesisMessageDrivenChannelAdapter{shardOffsets=[KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}], consumerGroup='group'}
2018-03-16 12:29:05.733  INFO 8188 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}, state=NEW}] has been started.
2018-03-16 12:29:05.796  INFO 8188 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8083 (http) with context path ''
2018-03-16 12:29:05.796  INFO 8188 --- [           main] com.example.aws.AwsApplication           : Started AwsApplication in 19.463 seconds (JVM running for 20.956)
Message recieved: GenericMessage [payload={"22":"11"}, headers={aws_shard=shardId-000000000000, id=cf8647fe-8ce5-70b5-eeb9-74a08efc870a, contentType=application/json, aws_receivedStream=stream, aws_receivedPartitionKey=partitionKey-0, aws_receivedSequenceNumber=49582549849562056887358041088914082500623155992949948418, timestamp=1521183775155}]
Message Payload: {"22":"11"}

Ideally only one consumer in group shall handle message. Am I missing something here.

Upvotes: 1

Views: 863

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121282

Thank you for validation the solution!

I think I found where is the problem. It is in the ShardCheckpointer.checkpoint(String sequenceNumber).

The current code is like this:

if (existingSequence == null ||
        new BigInteger(existingSequence).compareTo(new BigInteger(sequenceNumber)) < 0) {
        this.checkpointStore.put(this.key, sequenceNumber);
        return true;
}

Where there is a race condition, when both (all?) nodes check the state and get a smaller value from the store. So, we are passing condition and then we all go to the checkpointStore.put() part. And here all of them stores a new the same value and return true to let the Channel Adapter to process the same record.

My fix for this is like:

       if (existingSequence == null ||
                new BigInteger(existingSequence).compareTo(new BigInteger(sequenceNumber)) < 0) {
            if (existingSequence != null) {
                return this.checkpointStore.replace(this.key, existingSequence, sequenceNumber);
            }
            else {
                this.checkpointStore.put(this.key, sequenceNumber);
                return true;
            }
        }

The same condition, but I also use checkpointStore.replace() with this contract:

/**
 * Atomically replace the value for the key in the store if the old
 * value matches the oldValue argument.

Right now I try to come up with the test-case to validate and will let you know when BUILD-SNAPSHOT is ready to consume and validate on your side.

Upvotes: 2

Related Questions