Reputation: 2550
What I would like to do is:
DoWork
) which as part of it's work will subscribe to multiple hot inputs through multiple Worker
classesDoWork
subscribes toDoWork
has completed.Questions:
Subject
s the correct way to do this? It feels like there should be a better way?How to ensure that once the subscription in Main
is disposed, all of the incomingX
subscriptions are also disposed - i.e. Main
should control the lifecycle of all the subscriptions.
void Main()
{
var worker = new Worker();
using (worker.UpdateEvents.Subscribe(x => Console.WriteLine()))
{
worker.DoWork();
}
}
public class Worker1
{
private readonly Subject<string> updateEvents = new Subject<string>();
public IObservable<string> UpdateEvents { get { return updateEvents; } }
public void DoWork()
{
// Do some work
// subscribe to a hot observable (events coming in over the network)
incoming1.Subscribe(updateEvents);
var worker2 = new Worker2();
worker2.UpdateEvents.Subscribe(updateEvents);
worker2.DoWork();
}
}
public class Worker2
{
private readonly Subject<string> updateEvents = new Subject<string>();
public IObservable<string> UpdateEvents { get { return updateEvents; } }
public void DoWork()
{
// Do some work
// subscribe to some more events
incoming2.Subscribe(updateEvents);
var workerN = new WorkerN();
workerN.UpdateEvents.Subscribe(updateEvents);
workerN.DoWork();
}
}
Upvotes: 2
Views: 221
Reputation: 39182
James's answer (use Subject
and Merge
) captures the essence of the question. This answer offers a pattern that I've found useful in this situation (based on your comments to James's answer).
Essentially the pattern is to have your worker's expose an IObservable
that the caller will subscribe to before calling DoWork
. But this sort of API (call A before calling B) is problematic because it introduces temporal coupling.
To eliminate the temporal coupling, you end up turning your worker itself into an cold Observable that implicitly calls DoWork
when the caller subscribes. Once you realize the power of cold observables and the ability to use Observable.Create
to take action when an observer subscribes, the sky's the limit on the Rx chains you can create without ever needing to reach for a Subject
. Here's an example based on your original code.
Worker
is simple. It just subscribes to incoming1
and to Worker2
.
Worker2
is slightly more complex. It subscribes to incoming2
, performs some additional work, then finally subscribes to WorkerN
.
All the while maintaining the correct OnError
, OnCompleted
logic which your original code example fails to do. Meaning that the observable stream that Main
sees does not Complete
until all of the incoming streams and work streams complete. But Main
fails as soon as any of the incoming streams or work streams fails. Your code example with multiple calls to Subscribe(someSubject)
would cause the Subject
to complete (and thus Main's incoming stream to complete) as soon as any of the incoming
streams completes.
public class Worker1
{
public IObservable<string> UpdateEvents { get; private set; };
public Worker1()
{
// Each time someone subscribes, create a new worker2 and subscribe to the hot events as well as whatever worker2 produces.
UpdateEvents = Observable.Create(observer =>
{
var worker2 = new Worker2();
return incoming1.Merge(worker2.UpdateEvents).Subscribe(observer);
});
}
}
public class Worker2
{
public IObservable<string> UpdateEvents { get; private set; };
public Worker2()
{
// Each time someone subscribes, create a new worker and subscribe to the hot events as well as whatever worker2 produces.
UpdateEvents = Observable.Create(observer =>
{
// maybe this version needs to do some stuff after it has subscribed to incoming2 but before it subscribes to workerN:
var doWorkThenSubscribeToWorker = Observable.Create(o =>
{
DoWork(o);
var worker = new WorkerN();
return worker.UpdateEvents.Subscribe(o);
}
return incoming2.Merge(doWorkThenSubscribeToWorker).Subscribe(observer);
});
}
private void DoWork(IObserver<string> observer)
{
// do some work
observer.OnNext("result of work");
}
}
void Main()
{
var worker = new Worker();
worker.UpdateEvents.Do(x => Console.WriteLine()).Wait();
}
Upvotes: 3
Reputation: 29776
It's hard to follow exactly what you are asking for - a small but complete program would help here I think.
That said, there's nothing wrong with using a Subject
to introduce input into the Rx pipeline - there's a lot written about that on StackOverflow and elsewhere, so I won't rehash it.
Going just by the title of your question, I wonder does the following suit your purpose?
To do this you can use Merge
on a stream of streams. Your streams must all be of the same type - if they are not, you can create a suitable container type and project them into that type using Select
. For simplicity, I will assume the unified type is long
.
To start create a container for the streams:
var container = new Subject<IObservable<long>>();
Then combine the contained streams:
var combined = container.Merge();
Subscribe to combined to consume the results in the usual way, and dispose the subscription to unsubscribe from all streams at once.
You can then add streams as they are created like this:
// assume we got this from somewhere - e.g. a "worker" factory function
// Observable.Create may well be helpful to create an observable
// that initiates getting data from a network connection upon its subscription
IObservable<long> someNewStream;
// add the someNewStream to the container (it will be subscribed to once added)
container.OnNext(someNewStream);
// dump out the combined results to the console,
// IRL you would subscribe to this to process the results
var subscription = combined.Subscribe(Console.WriteLine);
// add a stream of longs
container.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
Console.WriteLine("Stream 1 added");
Console.ReadLine();
// add another stream
container.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
Console.WriteLine("Step 2");
Console.ReadLine();
// this will unsubscribe from all the live streams
subscription.Dispose();
Upvotes: 1