Reputation: 23187
I'm creating several subscriptions.
I need to detect when they (all) have been completed in order to perform an action.
private IList<IDisposable> subscriptions = new List<IDisposable>();
private void CreateSubscriptions(IEnumerable<int> integers)
{
if (this.subscriptions.Any())
foreach (IDisposable subscription in this.subscriptions)
subscription.Dispose();
this.subscriptions.Clear();
for (int i = 0; i < 50; i++)
{
this.subscriptions.Add(
Observable.Create<int>(
observer =>
{
foreach (int i in integers)
observer.OnNext(i);
observer.Completed();
return System.Reactive.Disposables.Disposable.Empty;
}
)
.SubscribeOn(System.Reactive.Concurrency.TaskPoolScheduler.Default)
.ObserveOn(this.fuasTileControl)
.Subscribe(
// action
);
}
}
As you can see I'm creating 50 observables that informs about next value of an IEnumerable<int>
. This is an stuff example code.
I need to get when all subscriptions has ended (completed) in order to perform an action.
Any ideas?
Upvotes: 0
Views: 93
Reputation: 14350
The slightly-pedantic, but correct answer is that this is impossible: A subscription is represented by an IDisposable
which offers no external way to detect when it terminates (besides outright causing termination). More precisely, given the function you illustrated above to fill IList<IDisposable> subscriptions
, there is no mechanism to act/subscribe when all those subscriptions are disposed.
There is however a way to act on a collection of observables (as represented by IObservable
), and when those are completed.
Given either IEnumerable<IObservable> observables
or IObservable<IObservable> observables
, the following will work:
observables
.Merge()
.Subscribe(
item => { /*onNext handler*/ },
e => { /*onError handler*/ },
() => { /* onCompleted handler */});
You would want to put whatever action you wanted as the onCompleted
handler.
Upvotes: 1
Reputation: 29720
There might be better options but the one I know is using the Merge
operator. See http://reactivex.io/documentation/operators/merge.html.
You can then subscribe to this observable. If you need a blocking solution you can use Wait()
instead.
I am also interested in other solutions.
void Main()
{
var sequence = CreateSubscriptions( new[] {1,5,6});
sequence.Subscribe((i) => {}, () => { Console.WriteLine("all ready"); });
//sequence.Wait();
//Console.WriteLine("all ready");
}
private IList<IDisposable> subscriptions = new List<IDisposable>();
private IObservable<int> CreateSubscriptions(IEnumerable<int> integers)
{
if (this.subscriptions.Any())
foreach (IDisposable subscription in this.subscriptions)
subscription.Dispose();
this.subscriptions.Clear();
var sequences = new List<IObservable<int>>();
for (int ix = 0; ix < 5; ix++)
{
var sequence = Observable.Create<int>(
observer =>
{
foreach (int i in integers)
observer.OnNext(i);
observer.OnCompleted();
return System.Reactive.Disposables.Disposable.Empty;
}
);
sequences.Add(sequence);
this.subscriptions.Add(sequence.Subscribe(nr =>
{
Console.WriteLine(nr);
}));
}
return Observable.Merge(sequences);
}
I've made some small modifications as well to make it compile.
Upvotes: 1