user7551211
user7551211

Reputation: 717

Cannot pass callback lambdas into .toReactivePublisher().subscribe() in Spring Integration with Project Reactor

I'm getting an error of Caused by: java.lang.IllegalStateException: No subscriptions have been created when I use Spring Integration with Project Reactor and I try to figure out how can I subscribe. My original code was:

    @Bean
    public IntegrationFlow writeToKafka() {
        return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
                .map(payload -> {
                    return new GenericMessage<ConsumerRecord<String, String>>(payload);
                }))
            .<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
            .channel(c -> c.queue("resultChannel"))
            .get();
    }

After it threw the error I've tried to subscribe but I've couldn't understand what should I pass to the subscribe method, that seems to act differently than the regular reactive .subscribe().

    @Bean
    public void writeToKafka() {
        return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
                .map(payload -> {
                    return new GenericMessage<ConsumerRecord<String, String>>(payload);
                }))
            .<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
            .channel(c -> c.queue("resultChannel"))
            .toReactivePublisher().subscribe(value -> {
                log.info("Wrote: " + value);
            });
    }

Upvotes: 0

Views: 198

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121482

Do that .toReactivePublisher().subscribe() combination is not correct. The IntegrationFlow must be first exposed and configured as a bean. And only then, after injecting this bean somewhere in your service you can subscribe() to that Publisher bean.

You are missing the fact that inversion of control has to be initialized first in its dependency injection container and only after that we can do some real work (subscribe) with those beans.

EDIT

For example my test-case:

@SpringJUnitConfig
@DirtiesContext
public class ReactiveStreamsTests {

    @Autowired
    @Qualifier("pollableReactiveFlow")
    private Publisher<Message<Integer>> pollablePublisher;

    @Autowired
    private AbstractEndpoint reactiveTransformer;

    @Autowired
    @Qualifier("inputChannel")
    private MessageChannel inputChannel;

    @Test
    void testPollableReactiveFlow() throws Exception {
        assertThat(this.reactiveTransformer).isInstanceOf(ReactiveStreamsConsumer.class);
        this.inputChannel.send(new GenericMessage<>("1,2,3,4,5"));

        CountDownLatch latch = new CountDownLatch(6);

        Flux.from(this.pollablePublisher)
                .take(6)
                .filter(m -> m.getHeaders().containsKey(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
                .doOnNext(p -> latch.countDown())
                .subscribe();

        ExecutorService exec = Executors.newSingleThreadExecutor();
        Future<List<Integer>> future =
                exec.submit(() ->
                        Flux.just("11,12,13")
                                .map(v -> v.split(","))
                                .flatMapIterable(Arrays::asList)
                                .map(Integer::parseInt)
                                .<Message<Integer>>map(GenericMessage::new)
                                .concatWith(this.pollablePublisher)
                                .take(7)
                                .map(Message::getPayload)
                                .collectList()
                                .block(Duration.ofSeconds(10))
                );

        this.inputChannel.send(new GenericMessage<>("6,7,8,9,10"));

        assertThat(latch.await(20, TimeUnit.SECONDS)).isTrue();
        List<Integer> integers = future.get(20, TimeUnit.SECONDS);

        assertThat(integers).isNotNull();
        assertThat(integers.size()).isEqualTo(7);
        exec.shutdownNow();
    }

    @Configuration
    @EnableIntegration
    public static class ContextConfiguration {

        @Bean
        public Publisher<Message<Integer>> pollableReactiveFlow() {
            return IntegrationFlows
                    .from("inputChannel")
                    .split(s -> s.delimiters(","))
                    .<String, Integer>transform(Integer::parseInt,
                            e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())).id("reactiveTransformer"))
                    .channel(MessageChannels.queue())
                    .log()
                    .toReactivePublisher();
        }

    }

}

Upvotes: 1

Related Questions