Reputation: 201
I am trying to implement a reactor Flux created from a BlockingQueue but not sure which operator is best for my use case?
I am creating a streaming REST end point, where response is Flux that needs to keep emitting messages from a BlockingQueue as a response to GET REST call.
I have already tried forums and documentation and can only find Flux initiated from iterable collections or reactive data sources, but no examples from any BlockingQueue.
Upvotes: 3
Views: 7067
Reputation: 381
An alternative worth considering is to get rid of BlockingQueue and use Sinks instead.
This requires:
Creating a sink, e.g.
private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Exposing a sink as a Flux:
sink.asFlux()
Pushing to a sink:
sink.tryEmitNext("SOME MESSAGE");
Upvotes: 2
Reputation: 3063
You can try Flux#generate and Queue#peek. Just keep in mind that peek
will return null
if the queue is empty, and it cannot be used in onNext
.
Something like:
Flux.generate(sink -> {
val element = queue.peek();
if (element == null) {
sink.complete();
} else {
sink.next(element);
}
});
There is also Flux#repeatWhen operator, in case you want to re-subscribe to the queue after it was considered empty, e.g. with:
flux.repeatWhen(it -> it.delayElements(ofSeconds(1)))
Upvotes: 11