Reputation: 205
I'm playing a little with Reactive Spring Integration and I try to perform the following basic operation:
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(someReactiveInboundChannelAdapter)
.handle(new ReactiveMessageHandlerAdapter(flux -> flux.subscribe(System.out::println)));
}
But this doesn't work since the framework won't recognize that flux
is a Flux<?>
instance. The framework treats this as a Message<?>
and I don't know how and where can I start to write Reactor code.
Upvotes: 1
Views: 1130
Reputation: 121282
That's correct. The ReactiveMessageHandlerAdapter
expects a lambda for the ReactiveMessageHandler
which contract is:
Mono<Void> handleMessage(Message<?> message);
I'm not sure what drove you to think that input has to be a Flux
.
I assume your someReactiveInboundChannelAdapter
is an extension of the MessageProducerSupport
and does exactly subscribeToPublisher(Publisher<? extends Message<?>> publisher)
logic. The point of this is to take data from the source in reactive manner, but still produce every event as a message to downstream channel. So, that should become clear that a handle()
is going to receive a message with a single item, not the whole Flux
.
If you want to see the flow as a flux, consider to use a fluxTransform()
operator.
Also it is better to not subscribe yourself, but let to do that in the framework, when configuration and startup is over.
Technically you should not think about reactive types. You just need to configure a flow respectively and write a logic only for individual item: the framework does a reactive interaction for you. The Project Reactor is a library around Flux
and Mono
. That's where we talk about reactive types. The Spring Integration is a messaging framework where its communication is done via messages. And in the end for every single message it must not matter if interaction between endpoints is done in reactive manner or not. Therefore your processing logic can be free from reactive types.
UDATE
If you want the full control of the Flux
from that someReactiveInboundChannelAdapter
, then you need to do like this:
@Bean
public Publisher<Message<Object>> reactiveFlow() {
return IntegrationFlows.from(someReactiveInboundChannelAdapter)
.toReactivePublisher();
}
and then inject that Publisher
whenever you need to use it. Then do Flux.from(publisher)
and whatever reactive operators you need, including subscribe()
.
Some sample of that is here: https://github.com/spring-projects/spring-integration/blob/main/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java
The IntegrationFlowAdapter
cannot be used for this type of configuration since it cannot accept the IntegrationFlow
as a result of the buildFlow()
.
The fluxTransform()
can do that for you as well, though:
.fluxTransform(flux -> flux.as(Mono::just))
So, the payload of downstream flow is going to be a Flux<Message<?>>
, which you can handle yourself. The returned Mono
from that fluxTransform()
is going to be subscribed by the framework. That Flux
of its value is your responsibility in the downstream flow. Then you can use the plain MessageHandler
:
.handle(m -> ((Flux<Message<?>>) m.getPayload())....subscribe())
Upvotes: 1