ropes-nopes
ropes-nopes

Reputation: 205

How to handle reactive types within Reactive Spring Integration?

I'm playing a little with Reactive Spring Integration and I try to perform the following basic operation:

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return from(someReactiveInboundChannelAdapter)
                .handle(new ReactiveMessageHandlerAdapter(flux -> flux.subscribe(System.out::println)));
    }

But this doesn't work since the framework won't recognize that flux is a Flux<?> instance. The framework treats this as a Message<?> and I don't know how and where can I start to write Reactor code.

Upvotes: 1

Views: 1130

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121282

That's correct. The ReactiveMessageHandlerAdapter expects a lambda for the ReactiveMessageHandler which contract is:

Mono<Void> handleMessage(Message<?> message);

I'm not sure what drove you to think that input has to be a Flux.

I assume your someReactiveInboundChannelAdapter is an extension of the MessageProducerSupport and does exactly subscribeToPublisher(Publisher<? extends Message<?>> publisher) logic. The point of this is to take data from the source in reactive manner, but still produce every event as a message to downstream channel. So, that should become clear that a handle() is going to receive a message with a single item, not the whole Flux.

If you want to see the flow as a flux, consider to use a fluxTransform() operator.

Also it is better to not subscribe yourself, but let to do that in the framework, when configuration and startup is over.

Technically you should not think about reactive types. You just need to configure a flow respectively and write a logic only for individual item: the framework does a reactive interaction for you. The Project Reactor is a library around Flux and Mono. That's where we talk about reactive types. The Spring Integration is a messaging framework where its communication is done via messages. And in the end for every single message it must not matter if interaction between endpoints is done in reactive manner or not. Therefore your processing logic can be free from reactive types.

UDATE

If you want the full control of the Flux from that someReactiveInboundChannelAdapter, then you need to do like this:

    @Bean
    public Publisher<Message<Object>> reactiveFlow() {
          return IntegrationFlows.from(someReactiveInboundChannelAdapter)
          .toReactivePublisher();
    }

and then inject that Publisher whenever you need to use it. Then do Flux.from(publisher) and whatever reactive operators you need, including subscribe().

Some sample of that is here: https://github.com/spring-projects/spring-integration/blob/main/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java

The IntegrationFlowAdapter cannot be used for this type of configuration since it cannot accept the IntegrationFlow as a result of the buildFlow().

The fluxTransform() can do that for you as well, though:

.fluxTransform(flux -> flux.as(Mono::just))

So, the payload of downstream flow is going to be a Flux<Message<?>>, which you can handle yourself. The returned Mono from that fluxTransform() is going to be subscribed by the framework. That Flux of its value is your responsibility in the downstream flow. Then you can use the plain MessageHandler:

.handle(m -> ((Flux<Message<?>>) m.getPayload())....subscribe())

Upvotes: 1

Related Questions