Jordi
Jordi

Reputation: 23187

Get when all subscriptions have ended

I'm creating several subscriptions.

I need to detect when they (all) have been completed in order to perform an action.

private IList<IDisposable> subscriptions = new List<IDisposable>();

private void CreateSubscriptions(IEnumerable<int> integers)
{
    if (this.subscriptions.Any())
        foreach (IDisposable subscription in this.subscriptions)
            subscription.Dispose();

    this.subscriptions.Clear();

    for (int i = 0; i < 50; i++)
    {    
        this.subscriptions.Add(
            Observable.Create<int>(
                observer =>
                {
                    foreach (int i in integers)
                        observer.OnNext(i);

                    observer.Completed();

                    return System.Reactive.Disposables.Disposable.Empty;
                }
            )
            .SubscribeOn(System.Reactive.Concurrency.TaskPoolScheduler.Default)
            .ObserveOn(this.fuasTileControl)
            .Subscribe(
                // action
            );
    }
}

As you can see I'm creating 50 observables that informs about next value of an IEnumerable<int>. This is an stuff example code.

I need to get when all subscriptions has ended (completed) in order to perform an action.

Any ideas?

Upvotes: 0

Views: 93

Answers (2)

Shlomo
Shlomo

Reputation: 14350

The slightly-pedantic, but correct answer is that this is impossible: A subscription is represented by an IDisposable which offers no external way to detect when it terminates (besides outright causing termination). More precisely, given the function you illustrated above to fill IList<IDisposable> subscriptions, there is no mechanism to act/subscribe when all those subscriptions are disposed.

There is however a way to act on a collection of observables (as represented by IObservable), and when those are completed.

Given either IEnumerable<IObservable> observables or IObservable<IObservable> observables, the following will work:

observables
    .Merge()
    .Subscribe(
        item => { /*onNext handler*/ }, 
        e =>    { /*onError handler*/ }, 
        () =>   { /* onCompleted handler */});

You would want to put whatever action you wanted as the onCompleted handler.

Upvotes: 1

Peter Bons
Peter Bons

Reputation: 29720

There might be better options but the one I know is using the Merge operator. See http://reactivex.io/documentation/operators/merge.html.

You can then subscribe to this observable. If you need a blocking solution you can use Wait() instead.

I am also interested in other solutions.

void Main()
{
    var sequence = CreateSubscriptions( new[] {1,5,6});

    sequence.Subscribe((i) => {}, () => { Console.WriteLine("all ready"); });

    //sequence.Wait();
    //Console.WriteLine("all ready");
}

private IList<IDisposable> subscriptions = new List<IDisposable>();

private IObservable<int> CreateSubscriptions(IEnumerable<int> integers)
{
    if (this.subscriptions.Any())
        foreach (IDisposable subscription in this.subscriptions)
            subscription.Dispose();

    this.subscriptions.Clear();
    var sequences = new List<IObservable<int>>();

    for (int ix = 0; ix < 5; ix++)
    {    
        var sequence = Observable.Create<int>(
                observer =>
                {
                    foreach (int i in integers)
                        observer.OnNext(i);

                    observer.OnCompleted();

                    return System.Reactive.Disposables.Disposable.Empty;
                }
            );
        sequences.Add(sequence);    
        this.subscriptions.Add(sequence.Subscribe(nr => 
        {  
            Console.WriteLine(nr); 
        }));
    }

    return Observable.Merge(sequences);
}

I've made some small modifications as well to make it compile.

Upvotes: 1

Related Questions