Reputation: 345
I have a data source service, which takes an observer as a parameter.
void subscribe(Consumer onEventConsumer);
I want to use flux as a response stream for RSocket. How can I do this? As I see it now, it should be something like
Flux<T> controllerMethod(RequestMessage mgs) {
var flux = Flux.empty();
dataSource.subscribe(event -> flux.push(event));
return flux;
}
But I have big doubts that it's a proper solution, and I'm new in the reactive approach, I don't know what methods I should use here?
Upvotes: 4
Views: 5009
Reputation: 3558
As Simon already pointed out, this is what you use Flux.create
for.
Take a look at the Getting Started Guide on projectreactor.io.
In short, you register a custom listener inside the lambda of the create
method:
Flux<String> bridge = Flux.create(sink -> {
myEventProcessor.register(
new MyEventListener<String>() {
public void onDataChunk(List<String> chunk) {
for(String s : chunk) {
sink.next(s);
}
}
public void processComplete() {
sink.complete();
}
});
});
What you want to do is to pass the incoming elements on to a FluxSink, which will then publish those elements on the Flux.
Upvotes: 3
Reputation: 28351
this is a typical use case of Flux.create. you register an obsereer from inside the create lambda, which will pass the data it receives down to the provided FluxSink
Upvotes: 0