Vipul Tiwari
Vipul Tiwari

Reputation: 67

Spring integration aws (sqs) to trigger spring integration flow

I have to listen a queue using spring integration flow and intgeration sqs. Once message is received from queue it should trigger a integration flow. Below is the things which I am trying but everythings fine in but afater receiving test it is not triggering any Integration flow. Please let me know where I am doing wrong:

UPDATED as per comment from Artem

Adapter for SQS.

@Bean
  public MessageProducerSupport sqsMessageDrivenChannelAdapter() {
    SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(amazonSQSAsync, "Main");
    adapter.setOutputChannel(inputChannel());
    adapter.setAutoStartup(true);
    adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NEVER);
    adapter.setMaxNumberOfMessages(10);
    adapter.setVisibilityTimeout(5);
    return adapter;
  }

Queue configured:

@Bean
  public MessageChannel inputChannel() {
    return new DirectChannel();
  }

Now the main integration flow trigger point:

@Bean
  public IntegrationFlow inbound() {
    return IntegrationFlows.from("inputChannel").transform(i -> "TEST_FLOW").get();
  }
  }

Appreciate any type of help.

Upvotes: 2

Views: 568

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121552

  1. The sqsMessageDrivenChannelAdapter() must be declared as a @Bean
  2. The inbound() must be declared as a @Bean
  3. This one fully does not make sense IntegrationFlows.from(MessageChannels.queue()). What is the point to start the flow from anonymous channel? Who and how is going to produce messages to that channel?

Make yourself familiar with different channels: https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-implementations

Pay attention that QueueChannel must be consumed via polling endpoint. Right, there is a default poller auto-configured by Spring Boot, but it is based on a single thread in the TaskScheduler and has a polling period as 10 millis.

I wouldn't recommend to hand off SQS messages to the QueueChannel: when consumer fails, you lose the data. It is better to process those messages in the consumer thread.

Otherwise your intention is not clear in the provided code.

Can you, please, share with us what error you get or anything else?

You also can turn on DEBUG logging level for org.springframework.integration to see how your messages are processed.

Upvotes: 2

Related Questions