Alex Netkachov
Alex Netkachov

Reputation: 13522

Rx: Wait for several observables to complete

I have list of operations to complete and I want to return an observable which is notified when all the observables are completed (returning status of operations will be the best):

foreach (var id in service.FetchItems().ToEnumerable().ToArray())
{
    service.Delete(id); // <- returns IObservable<Unit>
}
// something.Wait();

service.FetchItems() returns IObservable<string>, service.Delete(...) returns IObservable<Unit>

Is the following approach correct?

service.FetchItems().ForEachAsync(id => service.Delete(id)).ToObservable().Wait();

Upvotes: 5

Views: 6224

Answers (2)

Enigmativity
Enigmativity

Reputation: 117010

I would avoid all awaiting and tasks and just stick with plain RX for this.

Try this approach:

var query =
    from id in service.FetchItems()
    from u in service.Delete(id)
    select id;

query
    .ToArray()
    .Subscribe(ids =>
    {
        /* all fetches and deletes done now */
    });

The .ToArray() operator in Rx takes an IObservable<T> that returns zero or more T's and returns an IObservable<T[]> that returns a single array that contains zero or more T's only when the source observable completes.

Upvotes: 2

Lee
Lee

Reputation: 144106

ToEnumerable blocks waiting for the next element in the sequence. You could do:

Task delAllTask = service.FetchItems()
    .SelectMany(service.Delete)
    .ToTask();

then you can block on the task or continue asynchronously e.g.

delAllTask.Wait();
delAllTask.ContinueWith(...);

Upvotes: 0

Related Questions