Reputation: 309
Just a quick one.
Is it possible to have a reactive @StreamListener Input with Spring Cloud Stream? What I mean by that is something like this:
@StreamListener
public void log(@Input(Sink.INPUT) Flux<String> strings)
{
strings.log();
}
In the documentation I only saw examples like this:
@StreamListener(Sink.INPUT)
public void log(String message)
{
log.info(message);
}
or of Processors like this:
@StreamListener
@Output(Source.OUTPUT)
public Flux<String> log(@Input(Sink.INPUT) Flux<String> strings)
{
return strings.map(String::toUpperCase);
}
When I run the code from the first snippet I get a Dispatcher has no subscribers
exception.
Upvotes: 2
Views: 1512
Reputation: 2400
The first two are equivalent. Open a GitHub issue if you run into errors with it.
EDIT:
The two annotations are equivalent, but the listener will only subscribe to an input if there is a subscription on the input flux. In order to be entirely equivalent, the reactive one should be:
@StreamListener
public void log(@Input(Sink.INPUT) Flux<String> strings)
{
strings.log().subscribe();
}
or
@StreamListener
public void log(@Input(Sink.INPUT) Flux<String> strings)
{
strings.subscribe(log::info);
}
Simply calling log()
on the flux creates another Flux
without a subscription of its own.
Upvotes: 3