Iain
Iain

Reputation: 2550

Combine/merge an unknown number of observables together as they are created

What I would like to do is:

Questions:

Upvotes: 2

Views: 221

Answers (2)

Brandon
Brandon

Reputation: 39182

James's answer (use Subject and Merge) captures the essence of the question. This answer offers a pattern that I've found useful in this situation (based on your comments to James's answer).

Essentially the pattern is to have your worker's expose an IObservable that the caller will subscribe to before calling DoWork. But this sort of API (call A before calling B) is problematic because it introduces temporal coupling.

To eliminate the temporal coupling, you end up turning your worker itself into an cold Observable that implicitly calls DoWork when the caller subscribes. Once you realize the power of cold observables and the ability to use Observable.Create to take action when an observer subscribes, the sky's the limit on the Rx chains you can create without ever needing to reach for a Subject. Here's an example based on your original code.

Worker is simple. It just subscribes to incoming1 and to Worker2. Worker2 is slightly more complex. It subscribes to incoming2, performs some additional work, then finally subscribes to WorkerN.

All the while maintaining the correct OnError, OnCompleted logic which your original code example fails to do. Meaning that the observable stream that Main sees does not Complete until all of the incoming streams and work streams complete. But Main fails as soon as any of the incoming streams or work streams fails. Your code example with multiple calls to Subscribe(someSubject) would cause the Subject to complete (and thus Main's incoming stream to complete) as soon as any of the incoming streams completes.

public class Worker1
{
    public IObservable<string> UpdateEvents { get; private set; };

    public Worker1()
    {
        // Each time someone subscribes, create a new worker2 and subscribe to the hot events as well as whatever worker2 produces.
        UpdateEvents = Observable.Create(observer =>
        {
            var worker2 = new Worker2();
            return incoming1.Merge(worker2.UpdateEvents).Subscribe(observer);
        });
    }
}

public class Worker2
{
    public IObservable<string> UpdateEvents { get; private set; };

    public Worker2()
    {
        // Each time someone subscribes, create a new worker and subscribe to the hot events as well as whatever worker2 produces.
        UpdateEvents = Observable.Create(observer =>
        {
            // maybe this version needs to do some stuff after it has subscribed to incoming2 but before it subscribes to workerN:
            var doWorkThenSubscribeToWorker = Observable.Create(o =>
            {
                DoWork(o);
                var worker = new WorkerN();
                return worker.UpdateEvents.Subscribe(o);
            }

            return incoming2.Merge(doWorkThenSubscribeToWorker).Subscribe(observer);
        });
    }

    private void DoWork(IObserver<string> observer)
    {
        // do some work
        observer.OnNext("result of work");
    }
}


void Main()
{
    var worker = new Worker();
    worker.UpdateEvents.Do(x => Console.WriteLine()).Wait();
}

Upvotes: 3

James World
James World

Reputation: 29776

It's hard to follow exactly what you are asking for - a small but complete program would help here I think.

That said, there's nothing wrong with using a Subject to introduce input into the Rx pipeline - there's a lot written about that on StackOverflow and elsewhere, so I won't rehash it.

Going just by the title of your question, I wonder does the following suit your purpose?

Combining a dynamic number of streams

To do this you can use Merge on a stream of streams. Your streams must all be of the same type - if they are not, you can create a suitable container type and project them into that type using Select. For simplicity, I will assume the unified type is long.

To start create a container for the streams:

var container = new Subject<IObservable<long>>();

Then combine the contained streams:

var combined = container.Merge();

Subscribe to combined to consume the results in the usual way, and dispose the subscription to unsubscribe from all streams at once.

You can then add streams as they are created like this:

// assume we got this from somewhere - e.g. a "worker" factory function
// Observable.Create may well be helpful to create an observable
// that initiates getting data from a network connection upon its subscription
IObservable<long> someNewStream;

// add the someNewStream to the container (it will be subscribed to once added)
container.OnNext(someNewStream);

Example use

// dump out the combined results to the console,
// IRL you would subscribe to this to process the results
var subscription = combined.Subscribe(Console.WriteLine);

// add a stream of longs
container.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));    
Console.WriteLine("Stream 1 added");
Console.ReadLine();

// add another stream
container.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));    
Console.WriteLine("Step 2");
Console.ReadLine();

// this will unsubscribe from all the live streams
subscription.Dispose();

Upvotes: 1

Related Questions