Brian
Brian

Reputation: 338

Dynamic set of publishers all emiting through the same flux

I am trying to build a kind of hub service that can emit through a hot Flux (output) but you can also register/unregister Flux producers/publishers (input)

I know I can do something like:

    class Hub<T> {
        /**
         * @return unregister function
         */
        Function<Void, Void> registerProducer(final Flux<T> flux) { ... }

        Disposable subscribe(Consumer<? super T> consumer) {
            if (out == null) { 
                // obviously this will not work!
                out = Flux.merge(producer1, producer2, ...).share();
            }
            return out;
        }
    }

... but as these "producers" are registered and unregistered, how do I add a new flux source to the existing subscribed to flux? or remove a unregistered source from it?

TIA!

Upvotes: 1

Views: 430

Answers (1)

Michael Berry
Michael Berry

Reputation: 72254

Flux is immutable by design, so as you've implied in the question, there's no way to just "update" an existing Flux in situ.

Usually I'd recommend avoiding using a Processor directly. However, this is one of the (rare-ish) cases where a Processor is probably the only sane option, since you essentially want to be publishing elements dynamically based on the producers that you're registering. Something similar to:

class Hub<T> {

    private final FluxProcessor<T, T> processor;
    private final FluxSink<T> sink;

    public Hub() {
        this.processor = DirectProcessor.<T>create().serialize();
        this.sink = processor.sink();
    }

    public Disposable registerProducer(Flux<T> flux) {
        return flux.subscribe(sink::next);
    }

    public Flux<T> read() {
        return processor;
    }
}

If you want to remove a producer, then you can keep track of the Disposable returned from registerProducer() and call dispose() on it when done.

Upvotes: 1

Related Questions