Reputation: 479
I want to call an async function for each item in an observable. As answered here, the solution is to use SelectMany
. However, if the async method throws, the subscription will terminate. I have the following solution, which seems to work:
obs.SelectMany(x => Observable
.FromAsync(() => RunAsync())
.Catch(Observable.Empty<string>()));
Is there a more idiomatic solution?
Upvotes: 4
Views: 913
Reputation: 117175
There is a standard way to be able to observe the exceptions that occur in your RunAsync
call, and that's using .Materialize()
.
The .Materialize()
method turns an IObservable<T>
sequence into a IObservable<Notification<T>>
sequence where you can reason against the OnNext
, OnError
, and OnCompleted
calls.
I wrote this query:
var obs = Observable.Range(0, 10);
obs
.SelectMany(x =>
Observable
.FromAsync(() => RunAsync())
.Materialize())
.Where(x => x.Kind != NotificationKind.OnCompleted)
.Select(x => x.HasValue ? x.Value : (x.Exception.Message + "!"))
.Subscribe(x => x.Dump());
With this supporting code:
private int counter = 0;
private Random rnd = new Random();
private System.Threading.Tasks.Task<string> RunAsync()
{
return System.Threading.Tasks.Task.Factory.StartNew(() =>
{
System.Threading.Interlocked.Increment(ref counter);
if (rnd.NextDouble() < 0.3)
{
throw new Exception(counter.ToString());
}
return counter.ToString();
});
}
When I run it I get this kind of output:
2
4
5
1!
6
7
3!
10
8!
9
Each of the lines ending in !
are calls to RunAsync
that resulted in an exception.
Upvotes: 3
Reputation: 3299
You can also use OnErrorResumeNext.
obs.SelectMany(x => Observable
.FromAsync(() => RunAsync())
.OnErrorResumeNext(Observable.Empty<string>()));
Upvotes: 0