wookie
wookie

Reputation: 309

Reactive @StreamListener

Just a quick one.

Is it possible to have a reactive @StreamListener Input with Spring Cloud Stream? What I mean by that is something like this:

@StreamListener
public void log(@Input(Sink.INPUT) Flux<String> strings)
{
    strings.log();
}

In the documentation I only saw examples like this:

@StreamListener(Sink.INPUT)
public void log(String message)
{
    log.info(message);
}

or of Processors like this:

@StreamListener
@Output(Source.OUTPUT)
public Flux<String> log(@Input(Sink.INPUT) Flux<String> strings)
{
    return strings.map(String::toUpperCase);
}

When I run the code from the first snippet I get a Dispatcher has no subscribers exception.

Upvotes: 2

Views: 1512

Answers (1)

Marius Bogoevici
Marius Bogoevici

Reputation: 2400

The first two are equivalent. Open a GitHub issue if you run into errors with it.

EDIT:

The two annotations are equivalent, but the listener will only subscribe to an input if there is a subscription on the input flux. In order to be entirely equivalent, the reactive one should be:

@StreamListener
public void log(@Input(Sink.INPUT) Flux<String> strings)
{
    strings.log().subscribe();
}

or

@StreamListener
public void log(@Input(Sink.INPUT) Flux<String> strings)
{
    strings.subscribe(log::info);
}

Simply calling log() on the flux creates another Flux without a subscription of its own.

Upvotes: 3

Related Questions