Reputation: 609
Could this be a correct way to dispatch A common topic information through browser client?
@RestController
public class GenvScriptHandler {
DirectProcessor<String> topicData = DirectProcessor.create();
FluxSink<String> sink;
int test;
@GetMapping(value = "/addTopic")
public void addTopic() {
if (sink == null) {
sink = topicData.sink();
}
sink.next(String.valueOf(test++));
}
@GetMapping(value = "/getTopic", produces = "text/event-stream")
public Flux<String> getTopic() {
Flux<String> autoConnect = topicData.publish().autoConnect();
return autoConnect;
}
}
As I Use a DirectProcessor there is no backpressure possible, I wonder how the flux is consumed when sending through sse. Does a subscriber can request less than the number element pushed in the flux?
http://projectreactor.io/docs/core/release/reference/#_directprocessor
As a consequence, a DirectProcessor signals an IllegalStateException to its subscribers if you push N elements through it but at least one of its subscribers has requested less than N.
Upvotes: 0
Views: 1697
Reputation: 609
Subscribing with a SSE request, does a request(1) and not a request(Integer.MAX_VALUE)
So if I sink * 1000 times, the Processor OverLoad and an exception is thrown, even if it has subscribers:
reactor.core.Exceptions$OverflowException: Can't deliver value due to lack of requests
Safer to Use an EmitterProcessor or a ReplayProcessor in my case
Upvotes: 2