Reputation: 67
I have to listen a queue using spring integration flow and intgeration sqs. Once message is received from queue it should trigger a integration flow. Below is the things which I am trying but everythings fine in but afater receiving test it is not triggering any Integration flow. Please let me know where I am doing wrong:
UPDATED as per comment from Artem
Adapter for SQS.
@Bean
public MessageProducerSupport sqsMessageDrivenChannelAdapter() {
SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(amazonSQSAsync, "Main");
adapter.setOutputChannel(inputChannel());
adapter.setAutoStartup(true);
adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.NEVER);
adapter.setMaxNumberOfMessages(10);
adapter.setVisibilityTimeout(5);
return adapter;
}
Queue configured:
@Bean
public MessageChannel inputChannel() {
return new DirectChannel();
}
Now the main integration flow trigger point:
@Bean
public IntegrationFlow inbound() {
return IntegrationFlows.from("inputChannel").transform(i -> "TEST_FLOW").get();
}
}
Appreciate any type of help.
Upvotes: 2
Views: 568
Reputation: 121552
sqsMessageDrivenChannelAdapter()
must be declared as a @Bean
inbound()
must be declared as a @Bean
IntegrationFlows.from(MessageChannels.queue())
. What is the point to start the flow from anonymous channel? Who and how is going to produce messages to that channel?Make yourself familiar with different channels: https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-implementations
Pay attention that QueueChannel
must be consumed via polling endpoint.
Right, there is a default poller auto-configured by Spring Boot, but it is based on a single thread in the TaskScheduler
and has a polling period as 10 millis
.
I wouldn't recommend to hand off SQS messages to the QueueChannel
: when consumer fails, you lose the data. It is better to process those messages in the consumer thread.
Otherwise your intention is not clear in the provided code.
Can you, please, share with us what error you get or anything else?
You also can turn on DEBUG
logging level for org.springframework.integration
to see how your messages are processed.
Upvotes: 2