Patan
Patan

Reputation: 17893

Spring Aws Kinesis Messages are not consumed in order

I am pushing 100 messages on to stream with 1 shrad.

spring:
  cloud:
    stream:
      bindings:
        myOutBound:
          destination: my-stream
          contentType: application/json

I am pushing messages in loop for testing purpose

@EnableBinding(MyBinder.class)
public class MyProcessor {

  @Autowired
  private MyBinder myBinder;

  public void processRollup() {
    List<MyObject> myObjects =  IntStream.range(1, 100)
        .mapToObj(Integer::valueOf)
        .map(s-> new MyObject(s))
        .collect(toList());
    myObjects.forEach(messagePayload ->{
      System.out.println(messagePayload.getId());
      myBinder.myOutBound()
          .send(MessageBuilder.withPayload(messagePayload)
              .build());
        }
    );
  }

}

I am consuming messages like below

spring:
  cloud:
    stream:
      bindings:
        RollUpInboundStream:
          group: my-consumer-group
          destination: my-stream
          content-type: application/json

The message consumption is not ordered.

Am I missing something.

Upvotes: 1

Views: 488

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121560

There are several things to consider. First of all the producer in the Binder is based on the KinesisMessageHandler with async mode by default:

messageHandler.setSync(producerProperties.getExtension().isSync());

So, even if it looks for you that you send those messages in the right order one by one, it doesn't mean that they reach a stream on AWS at the same order.

Also there is no guarantee that they are settled on AWS in the same order anyway, even if you send them in a sync mode.

See here: Amazon Kinesis and guaranteed ordering

Also you can achieve an order guarantee within the same shard via an explicit sequenceNumber:

To guarantee strictly increasing ordering, write serially to a shard and use the SequenceNumberForOrdering parameter.

https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html

Unfortunately the Kinesis Binder doesn't support that option at the moment, but we can overcome it via an explicit AwsHeaders.SEQUENCE_NUMBER set into the message before sending it into the output destination of the binder:

String sequenceNumber = messageHeaders.get(AwsHeaders.SEQUENCE_NUMBER, String.class);
    if (!StringUtils.hasText(sequenceNumber) && this.sequenceNumberExpression != null) {
        sequenceNumber = this.sequenceNumberExpression.getValue(getEvaluationContext(), message, String.class);
    }

Upvotes: 2

Related Questions