n0noob
n0noob

Reputation: 939

How to multicast a Mono to multiple subscribers in Project Reactor?

With what I have read so far, one can multicast a Flux to multiple subscribers using ConnectableFlux using something similar to following:

Flux<Integer> integerFlux = Flux.range(1, 50)
                .log();

ConnectableFlux<Integer> integerConnectableFlux = integerFlux.publish();

integerConnectableFlux.map(i -> i*2)
                .subscribe(System.out::println);

integerConnectableFlux.map(i -> i*3)
                .subscribe(System.out::println);

integerConnectableFlux.connect();

To my limited understanding of reactive streams, above code converts a cold publisher to a hot publisher. I am working on scenario where I have multiple subscribers for a Mono. How can I get a hot publisher out of a Mono?

Upvotes: 2

Views: 3591

Answers (2)

n0noob
n0noob

Reputation: 939

Managed to multicast the mono using Mono.share() operator:

Prepare a Mono which shares this Mono result similar to Flux.shareNext(). This will effectively turn this Mono into a hot task when the first Subscriber subscribes using subscribe() API. Further Subscriber will share the same Subscription and therefore the same result. It's worth noting this is an un-cancellable Subscription.

Example: Following code creates only one subscription on the publisher:

        Mono<Integer> integerMono = Mono.just(2)
                .log()
                .share();

        integerMono.map(i -> i+3)
                .subscribe(System.out::println);

        integerMono.map(i -> i*5)
                .subscribe(System.out::println);

Output of above code is shown below (data is requested only once):

5
reactor.Mono.Just.1 - {} - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
reactor.Mono.Just.1 - {} - | request(unbounded)
reactor.Mono.Just.1 - {} - | onNext(2)
reactor.Mono.Just.1 - {} - | onComplete()
10

Upvotes: 3

lkatiforis
lkatiforis

Reputation: 6255

Have a look at Mono#cache() operator:

Turn this Mono into a hot source and cache last emitted signals for further Subscriber. Completion and Error will also be replayed.

Upvotes: 2

Related Questions