bodtx
bodtx

Reputation: 609

server sent event spring webFlux with reactor

Could this be a correct way to dispatch A common topic information through browser client?

@RestController
public class GenvScriptHandler {

    DirectProcessor<String> topicData = DirectProcessor.create();
    FluxSink<String> sink;
    int test;


    @GetMapping(value = "/addTopic")
    public void addTopic() {
        if (sink == null) {
            sink = topicData.sink();
        }
        sink.next(String.valueOf(test++));
    }

    @GetMapping(value = "/getTopic", produces = "text/event-stream")
    public Flux<String> getTopic() {
        Flux<String> autoConnect = topicData.publish().autoConnect();

        return autoConnect;
    }
}

As I Use a DirectProcessor there is no backpressure possible, I wonder how the flux is consumed when sending through sse. Does a subscriber can request less than the number element pushed in the flux?

http://projectreactor.io/docs/core/release/reference/#_directprocessor

As a consequence, a DirectProcessor signals an IllegalStateException to its subscribers if you push N elements through it but at least one of its subscribers has requested less than N.

Upvotes: 0

Views: 1697

Answers (1)

bodtx
bodtx

Reputation: 609

Subscribing with a SSE request, does a request(1) and not a request(Integer.MAX_VALUE)

So if I sink * 1000 times, the Processor OverLoad and an exception is thrown, even if it has subscribers:

reactor.core.Exceptions$OverflowException: Can't deliver value due to lack of requests

Safer to Use an EmitterProcessor or a ReplayProcessor in my case

Upvotes: 2

Related Questions