Victor Grigoriu
Victor Grigoriu

Reputation: 479

Rx: subscribe with async function and ignore errors

I want to call an async function for each item in an observable. As answered here, the solution is to use SelectMany. However, if the async method throws, the subscription will terminate. I have the following solution, which seems to work:

obs.SelectMany(x => Observable
    .FromAsync(() => RunAsync())
    .Catch(Observable.Empty<string>()));

Is there a more idiomatic solution?

Upvotes: 4

Views: 913

Answers (2)

Enigmativity
Enigmativity

Reputation: 117175

There is a standard way to be able to observe the exceptions that occur in your RunAsync call, and that's using .Materialize().

The .Materialize() method turns an IObservable<T> sequence into a IObservable<Notification<T>> sequence where you can reason against the OnNext, OnError, and OnCompleted calls.

I wrote this query:

var obs = Observable.Range(0, 10);

obs
    .SelectMany(x =>
        Observable
            .FromAsync(() => RunAsync())
            .Materialize())
    .Where(x => x.Kind != NotificationKind.OnCompleted)
    .Select(x => x.HasValue ? x.Value : (x.Exception.Message + "!"))
    .Subscribe(x => x.Dump());

With this supporting code:

private int counter = 0;
private Random rnd = new Random();

private System.Threading.Tasks.Task<string> RunAsync()
{
    return System.Threading.Tasks.Task.Factory.StartNew(() =>
    {
        System.Threading.Interlocked.Increment(ref counter);
        if (rnd.NextDouble() < 0.3)
        {
            throw new Exception(counter.ToString());
        }
        return counter.ToString();
    });
}

When I run it I get this kind of output:

2
4
5
1!
6
7
3!
10
8!
9

Each of the lines ending in ! are calls to RunAsync that resulted in an exception.

Upvotes: 3

treze
treze

Reputation: 3299

You can also use OnErrorResumeNext.

obs.SelectMany(x => Observable
    .FromAsync(() => RunAsync())
    .OnErrorResumeNext(Observable.Empty<string>()));

Upvotes: 0

Related Questions