How execute Spring integration flow in multiple threads to consume more Amazon SQS queue messages in parallel?

need help

I need to create multiple sqs queue consumers that execute in parallel, but i don't know how to achieve this using Sprint Integration

I have the following architecture

An Amazon SQS queue with 200k messages

A Amazon stack with 5 EC2 instances, every instance with tomcat server running a Spring boot application with a Spring Integration flow that consume the messages of SQS using sqs-message-driven-channel-adapter from spring-integration-aws (https://github.com/spring-projects/spring-integration-aws)

and publish that messages to a REST service that have and average response of 1 second (i can not modified the REST service is a constraint but i can send messages in parallel)

SQS queue -> Stack(5 tomcat instances) -> Rest Service

Constraints Amazon SQS allows client to read messages in batch with a maximum number of 10 messages by request but i can have multiple clients to consume more messages in parallel.

In Amazon SQS the message need to be deleted manually this is done using spring integration, i delete the message only if the REST service return OK.

I don't have problem with possible duplicates (SQS send the same message to two different client)

I can not store in any way messages in my Spring Boot application

My Spring Integration flow

<aws-messaging:sqs-async-client id="clientWithCredentials"/>
<int-aws:sqs-message-driven-channel-adapter
  sqs="clientWithCredentials" 
  channel="channel_1"
  queues="https://sqs.us-east-1.amazonaws.com/123456789000/SomeAmazonSQSName"
  max-number-of-messages="10"/>

<int:channel id="channel_1" />
<int:outbound-channel-adapter ref="restService" method="publish" channel="channel_1" />

How i can execute this flow in parallel in multiple threads to consume more messages in parallel?

I try to put <int:poller fixed-rate="1" task-executor="executor" /> inside sqs-message-driven-channel-adapter but is not allowed.

Upvotes: 3

Views: 2431

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121167

To achiever such a requirements you can use an ExecutorChannel instead of default DirectChannel.

This way all the SQS messages are going to be distributed to the the thread supplied by the ExecutorChannel and, therefore, performed in parallel.

More info about an ExecutorChannel is in the Reference Manual.

UPDATE

So, what I suggest should be reflected in your current config like:

<int:channel id="channel_1">
   <int:dispatcher task-executor="someExecutor"/>
</int:channel>

UPDATE

If you still insist to have several SQS Adapters, then simplified version is like this:

<int-aws:sqs-message-driven-channel-adapter
    sqs="sqsAsyncClient" 
    channel="sqs-to-metricator"
    queues="https://sqs.us-east-1.amazonaws.com/123/SomeSQSQueueName"
    max-number-of-messages="10"
    />


<int-aws:sqs-message-driven-channel-adapter
    sqs="sqsAsyncClient" 
    channel="sqs-to-metricator"
    queues="https://sqs.us-east-1.amazonaws.com/123/SomeSQSQueueName"
    max-number-of-messages="10"
    />

<int-aws:sqs-message-driven-channel-adapter
    sqs="sqsAsyncClient" 
    channel="sqs-to-metricator"
    queues="https://sqs.us-east-1.amazonaws.com/123/SomeSQSQueueName"
    max-number-of-messages="10"
    />

<int:channel id="sqs-to-metricator" />

<int:outbound-channel-adapter ref="restService"
    method="publish" channel="sqs-to-metricator" />

Also to avoid duplication you can consider to switch to the Java DSL and start to use its ItengrationFlowContext for dynamic IntegrationFlow registrations: https://docs.spring.io/spring-integration/docs/5.0.4.RELEASE/reference/html/java-dsl.html#java-dsl-runtime-flows

Upvotes: 3

Related Questions