Reputation: 365
How can I setup a reactive flow using DSL for the following steps:
SqsMessageDrivenChannelAdapter
JsonSchemaValidator
class with validate
method]BusinessService
: business logic, state machine)I was looking at this : https://github.com/spring-projects/spring-integration/blob/master/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java
In the above example, there are dedicated flows created that return a Publisher
and in the tests the Publishers are subscribed
. However, my flow will be triggered when SqsMessageDrivenChannelAdapter
brings in a message into a channel.
How to achieve a reactive flow configuration, for the scenario above steps 1 to 5?
Update : Sample code added
@Bean
public IntegrationFlow importFlow() {
IntegrationFlows.from(sqsInboundChannel())
.handle((payload, messageHeaders) -> jsonSchemaValidator.validate(payload.toString()))
.transform(Transformers.fromJson(Entity.class))
.handle((payload, messageHeaders) ->businessService.process((Entity) payload))
.handle(
Jpa.outboundAdapter(this.entityManagerFactory)
.entityClass(Entity)
.persistMode(PersistMode.PERSIST),
ConsumerEndpointSpec::transactional)
.get();
}
@Bean
public MessageProducer sqsMessageDrivenChannelAdapter() {
SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter =
new SqsMessageDrivenChannelAdapter(asyncSqsClient, queueName);
sqsMessageDrivenChannelAdapter.setAutoStartup(true);
sqsMessageDrivenChannelAdapter.setOutputChannel(sqsInboundChannel());
return sqsMessageDrivenChannelAdapter;
}
@Bean
public MessageChannel sqsInboundChannel() {
return MessageChannels.flux().get();
}
Update 2 : Moved JPA to a diff thread using executor channel
@Bean
public IntegrationFlow importFlow() {
IntegrationFlows.from(sqsInboundChannel())
.handle((payload, messageHeaders) -> jsonSchemaValidator.validate(payload.toString()))
.transform(Transformers.fromJson(Entity.class))
.handle((payload, messageHeaders) ->businessService.process((Entity) payload))
.channel(persistChannel())
.handle(
Jpa.outboundAdapter(this.entityManagerFactory)
.entityClass(Entity)
.persistMode(PersistMode.PERSIST),
ConsumerEndpointSpec::transactional)
.get();
}
@Bean
public MessageProducer sqsMessageDrivenChannelAdapter() {
SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter =
new SqsMessageDrivenChannelAdapter(asyncSqsClient, queueName);
sqsMessageDrivenChannelAdapter.setAutoStartup(true);
sqsMessageDrivenChannelAdapter.setOutputChannel(sqsInboundChannel());
return sqsMessageDrivenChannelAdapter;
}
@Bean
public MessageChannel sqsInboundChannel() {
return MessageChannels.flux().get();
}
@Bean
public MessageChannel persistChannel() {
return MessageChannels.executor(Executors.newCachedThreadPool()).get();
}
Upvotes: 0
Views: 910
Reputation: 121442
You probably need to make yourself more familiar with what we have so far for Reactive Streams in Spring Integration: https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#reactive-streams
The sample you show with that test class is fully not relevant to your use case. In that test we try to cover some API we expose in Spring Integration, kinda unit tests. It has nothing to do with the whole flow.
Your use-case is really just a full black box flow starting with SQS listener and ending in the R2DBC. Therefore there is no point in your flow to try to convert part of it into the Publisher
and then bring it back to another part of the flow: you are not going to track some how and subscribe to that Publisher
yourself.
You may consider to place a FluxMessageChannel
in between endpoints in your flow, but it still does not make sense for your use-case. It won't be fully reactive as you expect just because a org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer
is not blocking on the consumer thread to be ready for a back-pressure from downstream.
The only really reactive part of your flow is that R2DBC outbound channel adapter, but probably it does not bring you too much value because the source of data is not reactive.
As I said: you can try to place a channel(channels -> channels.flux())
just after an SqsMessageDrivenChannelAdapter
definition to start a reactive flow from that point. At the same time you should try to set a maxNumberOfMessages
to 1
to try to make it waiting for a free space in before pulling the next mesasge from SQS.
Upvotes: 1