John Zhang
John Zhang

Reputation: 1142

Spring WebFlux (Flux): how to publish dynamically

I am new to Reactive programming and Spring WebFlux. I want to make my App 1 publish Server Sent event through Flux and my App 2 listen on it continuously.

I want Flux publish on-demand (e.g. when something happens). All the example I found is to use Flux.interval to periodically publish event, and there seems no way to append/modify the content in Flux once it is created.

How can I achieve my goal? Or I am totally wrong conceptually.

Upvotes: 36

Views: 27982

Answers (2)

Oleh Dokuka
Oleh Dokuka

Reputation: 12184

Publish "dynamically" using FluxProcessor and FluxSink

One of the techniques to supply data manually to the Flux is using FluxProcessor#sink method as in the following example

@SpringBootApplication
@RestController
public class DemoApplication {

    final FluxProcessor processor;
    final FluxSink sink;
    final AtomicLong counter;

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);

    }

    public DemoApplication() {
        this.processor = DirectProcessor.create().serialize();
        this.sink = processor.sink();
        this.counter = new AtomicLong();
    }

    @GetMapping("/send")
    public void test() {
        sink.next("Hello World #" + counter.getAndIncrement());
    }

    @RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent> sse() {
        return processor.map(e -> ServerSentEvent.builder(e).build());
    }
}

Here, I created DirectProcessor in order to support multiple subscribers, that will listen to the data stream. Also, I provided additional FluxProcessor#serialize which provide safe support for multiproducer (invocation from different threads without violation of Reactive Streams spec rules, especially rule 1.3). Finally, by calling "http://localhost:8080/send" we will see the message Hello World #1 (of course, only in case if you connected to the "http://localhost:8080" previously)

Update For Reactor 3.4

With Reactor 3.4 you have a new API called reactor.core.publisher.Sinks. Sinks API offers a fluent builder for manual data-sending which lets you specify things like the number of elements in the stream and backpressure behavior, number of supported subscribers, and replay capabilities:

@SpringBootApplication
@RestController
public class DemoApplication {

    final Sinks.Many sink;
    final AtomicLong counter;

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);

    }

    public DemoApplication() {
        this.sink = Sinks.many().multicast().onBackpressureBuffer();
        this.counter = new AtomicLong();
    }

    @GetMapping("/send")
    public void test() {
        EmitResult result = sink.tryEmitNext("Hello World #" + counter.getAndIncrement());

        if (result.isFailure()) {
          // do something here, since emission failed
        }
    }

    @RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent> sse() {
        return sink.asFlux().map(e -> ServerSentEvent.builder(e).build());
    }
}

Note, message sending via Sinks API introduces a new concept of emission and its result. The reason for such API is the fact that the Reactor extends Reactive-Streams and has to follow the backpressure control. That said if you emit more signals than was requested, and the underlying implementation does not support buffering, your message will not be delivered. Therefore, the result of tryEmitNext returns the EmitResult which indicates if the message was sent or not.

Also, note, that by default Sinsk API gives a serialized version of Sink, which means you don't have to care about concurrency. However, if you know in advance that the emission of the message is serial, you may build a Sinks.unsafe() version which does not serialize given messages

Upvotes: 82

CheGola
CheGola

Reputation: 75

Just another idea, using EmitterProcessor as a gateway to flux

    import reactor.core.publisher.EmitterProcessor;
    import reactor.core.publisher.Flux;

    public class MyEmitterProcessor {
        EmitterProcessor<String> emitterProcessor;

        public static void main(String args[]) {

            MyEmitterProcessor myEmitterProcessor = new MyEmitterProcessor();
            Flux<String> publisher = myEmitterProcessor.getPublisher();
            myEmitterProcessor.onNext("A");
            myEmitterProcessor.onNext("B");
            myEmitterProcessor.onNext("C");
            myEmitterProcessor.complete();

            publisher.subscribe(x -> System.out.println(x));

        }

        public Flux<String> getPublisher() {
            emitterProcessor = EmitterProcessor.create();
            return emitterProcessor.map(x -> "consume: " + x);
        } 

        public  void onNext(String nextString) {
            emitterProcessor.onNext(nextString);
        }

        public  void complete() {
            emitterProcessor.onComplete();
        }
    }

More info, see here from Reactor doc. There is a recommendation from the document itself that "Most of the time, you should try to avoid using a Processor. They are harder to use correctly and prone to some corner cases." BUT I don't know which kind of corner case.

Upvotes: 5

Related Questions