Reputation: 939
With what I have read so far, one can multicast a Flux to multiple subscribers using ConnectableFlux
using something similar to following:
Flux<Integer> integerFlux = Flux.range(1, 50)
.log();
ConnectableFlux<Integer> integerConnectableFlux = integerFlux.publish();
integerConnectableFlux.map(i -> i*2)
.subscribe(System.out::println);
integerConnectableFlux.map(i -> i*3)
.subscribe(System.out::println);
integerConnectableFlux.connect();
To my limited understanding of reactive streams, above code converts a cold publisher to a hot publisher.
I am working on scenario where I have multiple subscribers for a Mono
. How can I get a hot publisher out of a Mono?
Upvotes: 2
Views: 3591
Reputation: 939
Managed to multicast the mono using Mono.share()
operator:
Prepare a Mono which shares this Mono result similar to Flux.shareNext(). This will effectively turn this Mono into a hot task when the first Subscriber subscribes using subscribe() API. Further Subscriber will share the same Subscription and therefore the same result. It's worth noting this is an un-cancellable Subscription.
Example: Following code creates only one subscription on the publisher:
Mono<Integer> integerMono = Mono.just(2)
.log()
.share();
integerMono.map(i -> i+3)
.subscribe(System.out::println);
integerMono.map(i -> i*5)
.subscribe(System.out::println);
Output of above code is shown below (data is requested only once):
5
reactor.Mono.Just.1 - {} - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
reactor.Mono.Just.1 - {} - | request(unbounded)
reactor.Mono.Just.1 - {} - | onNext(2)
reactor.Mono.Just.1 - {} - | onComplete()
10
Upvotes: 3
Reputation: 6255
Have a look at Mono#cache()
operator:
Turn this Mono into a hot source and cache last emitted signals for further Subscriber. Completion and Error will also be replayed.
Upvotes: 2