Reputation: 717
I'm getting an error of Caused by: java.lang.IllegalStateException: No subscriptions have been created
when I use Spring Integration with Project Reactor and I try to figure out how can I subscribe. My original code was:
@Bean
public IntegrationFlow writeToKafka() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
.map(payload -> {
return new GenericMessage<ConsumerRecord<String, String>>(payload);
}))
.<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
.channel(c -> c.queue("resultChannel"))
.get();
}
After it threw the error I've tried to subscribe but I've couldn't understand what should I pass to the subscribe method, that seems to act differently than the regular reactive .subscribe()
.
@Bean
public void writeToKafka() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
.map(payload -> {
return new GenericMessage<ConsumerRecord<String, String>>(payload);
}))
.<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
.channel(c -> c.queue("resultChannel"))
.toReactivePublisher().subscribe(value -> {
log.info("Wrote: " + value);
});
}
Upvotes: 0
Views: 198
Reputation: 121482
Do that .toReactivePublisher().subscribe()
combination is not correct. The IntegrationFlow
must be first exposed and configured as a bean. And only then, after injecting this bean somewhere in your service you can subscribe()
to that Publisher
bean.
You are missing the fact that inversion of control has to be initialized first in its dependency injection container and only after that we can do some real work (subscribe) with those beans.
EDIT
For example my test-case:
@SpringJUnitConfig
@DirtiesContext
public class ReactiveStreamsTests {
@Autowired
@Qualifier("pollableReactiveFlow")
private Publisher<Message<Integer>> pollablePublisher;
@Autowired
private AbstractEndpoint reactiveTransformer;
@Autowired
@Qualifier("inputChannel")
private MessageChannel inputChannel;
@Test
void testPollableReactiveFlow() throws Exception {
assertThat(this.reactiveTransformer).isInstanceOf(ReactiveStreamsConsumer.class);
this.inputChannel.send(new GenericMessage<>("1,2,3,4,5"));
CountDownLatch latch = new CountDownLatch(6);
Flux.from(this.pollablePublisher)
.take(6)
.filter(m -> m.getHeaders().containsKey(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.doOnNext(p -> latch.countDown())
.subscribe();
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<List<Integer>> future =
exec.submit(() ->
Flux.just("11,12,13")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.<Message<Integer>>map(GenericMessage::new)
.concatWith(this.pollablePublisher)
.take(7)
.map(Message::getPayload)
.collectList()
.block(Duration.ofSeconds(10))
);
this.inputChannel.send(new GenericMessage<>("6,7,8,9,10"));
assertThat(latch.await(20, TimeUnit.SECONDS)).isTrue();
List<Integer> integers = future.get(20, TimeUnit.SECONDS);
assertThat(integers).isNotNull();
assertThat(integers.size()).isEqualTo(7);
exec.shutdownNow();
}
@Configuration
@EnableIntegration
public static class ContextConfiguration {
@Bean
public Publisher<Message<Integer>> pollableReactiveFlow() {
return IntegrationFlows
.from("inputChannel")
.split(s -> s.delimiters(","))
.<String, Integer>transform(Integer::parseInt,
e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())).id("reactiveTransformer"))
.channel(MessageChannels.queue())
.log()
.toReactivePublisher();
}
}
}
Upvotes: 1