Reputation: 13972
I have created an observable that consists of an item being transformed to another by running an async method.
IObservable<Summary> obs = scanner.Scans
.SelectMany(b => GetAssignment(b))
.SelectMany(b => VerifyAssignment(b))
.SelectMany(b => ConfirmAssignmentData(b))
.SelectMany(b => UploadAsset(b))
.Select(assignment => new Summary())
.Catch(LogException());
I would like to make this fail-proof, so in case an exception is thrown during the processing, I should log the exception, but ignore the exception and resume with the next scan (the next item pushed by scanner.Scans
)
The current code catches any exception, but the sequence finished as soon as an exception is thrown.
How can I make it "swallow" the exception (logging it), but to resume with the next item?
Upvotes: 5
Views: 1342
Reputation: 43474
You could use the application-specific operator LogAndIgnoreError
below:
/// <summary>Ensures that the source sequence will always complete successfully.
/// In case of failure the error is logged.</summary>
public static IObservable<T> LogAndIgnoreError<T>(this IObservable<T> source)
{
return source.Catch((Exception error) =>
{
// Application-specific logging
Console.WriteLine($"Log - {error.GetType().Name}: {error.Message}");
return Observable.Empty<T>();
});
}
You could then attach this operator to any sequence whose error you would like to ignore.
Usage example:
IObservable<Summary> obs = scanner.Scans
.SelectMany(b => GetAssignment(b).LogAndIgnoreError())
.SelectMany(b => VerifyAssignment(b).LogAndIgnoreError())
.SelectMany(b => ConfirmAssignmentData(b).LogAndIgnoreError())
.SelectMany(b => UploadAsset(b).LogAndIgnoreError())
.Select(assignment => new Summary())
.LogAndIgnoreError();
Upvotes: 1
Reputation: 117037
Rx is a functional paradigm so it's very useful to use a functional approach to solving this problem.
The answer is to introduce another monad that can cope with errors, like Nullable<T>
can cope with integers having a null value, but in this case a class that can either represent a value or an exception.
public class Exceptional
{
public static Exceptional<T> From<T>(T value) => new Exceptional<T>(value);
public static Exceptional<T> From<T>(Exception ex) => new Exceptional<T>(ex);
public static Exceptional<T> From<T>(Func<T> factory) => new Exceptional<T>(factory);
}
public class Exceptional<T>
{
public bool HasException { get; private set; }
public Exception Exception { get; private set; }
public T Value { get; private set; }
public Exceptional(T value)
{
this.HasException = false;
this.Value = value;
}
public Exceptional(Exception exception)
{
this.HasException = true;
this.Exception = exception;
}
public Exceptional(Func<T> factory)
{
try
{
this.Value = factory();
this.HasException = false;
}
catch (Exception ex)
{
this.Exception = ex;
this.HasException = true;
}
}
public override string ToString() =>
this.HasException
? this.Exception.GetType().Name
: (this.Value != null ? this.Value.ToString() : "null");
}
public static class ExceptionalExtensions
{
public static Exceptional<T> ToExceptional<T>(this T value) => Exceptional.From(value);
public static Exceptional<T> ToExceptional<T>(this Func<T> factory) => Exceptional.From(factory);
public static Exceptional<U> Select<T, U>(this Exceptional<T> value, Func<T, U> m) =>
value.SelectMany(t => Exceptional.From(() => m(t)));
public static Exceptional<U> SelectMany<T, U>(this Exceptional<T> value, Func<T, Exceptional<U>> k) =>
value.HasException ? Exceptional.From<U>(value.Exception) : k(value.Value);
public static Exceptional<V> SelectMany<T, U, V>(this Exceptional<T> value, Func<T, Exceptional<U>> k, Func<T, U, V> m) =>
value.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t, u))));
}
So, let's start by creating an Rx query that throws an exception.
IObservable<int> query =
Observable
.Range(0, 10)
.Select(x => 5 - x)
.Select(x => 100 / x)
.Select(x => x + 5);
If I run the observable I get this:
Let's transform this with with Exceptional
and see how it allows us to continue processing when an error occurs.
IObservable<Exceptional<int>> query =
Observable
.Range(0, 10)
.Select(x => x.ToExceptional())
.Select(x => x.Select(y => 5 - y))
.Select(x => x.Select(y => 100 / y))
.Select(x => x.Select(y => y + 5));
Now when I run it I get this:
Now I could test each result, see if HasException
is true
and log each exception, meanwhile the observable continues.
Finally, it's easy to clean up the query to look almost the same as the original by introducing one further extension method.
public static IObservable<Exceptional<U>> Select<T, U>(this IObservable<Exceptional<T>> source, Func<T, U> m) =>
source.Select(x => x.SelectMany(y => Exceptional.From(() => m(y))));
This combines observables and exceptionals into a single Select
operator.
Now the query can look like this:
IObservable<Exceptional<int>> query =
Observable
.Range(0, 10)
.Select(x => x.ToExceptional())
.Select(x => 5 - x)
.Select(x => 100 / x)
.Select(x => x + 5);
I get the same result at before.
Finally, I could get this all working with query syntax by adding two more extension methods:
public static IObservable<Exceptional<U>> SelectMany<T, U>(this IObservable<T> source, Func<T, Exceptional<U>> k) =>
source.Select(t => k(t));
public static IObservable<Exceptional<V>> SelectMany<T, U, V>(this IObservable<T> source, Func<T, Exceptional<U>> k, Func<T, U, V> m) =>
source.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t, u))));
This allows:
IObservable<Exceptional<int>> query =
from n in Observable.Range(0, 10)
from x in n.ToExceptional()
let a = 5 - x
let b = 100 / a
select b + 5;
Again, I get the same results as before.
Upvotes: 5
Reputation: 14350
The question presumes a fundamental misunderstanding: According to the Observable contract, a well-behaved observable terminates after an OnError
notification. To your case, there is no, "just log and continue" option, because there's nothing to continue on. The observable that throws an exception via an OnError
is done, kaput, finito, gone forever.
A comment mentioned Retry
, which may be applicable: If you have an observable pipeline like so:
someHotSource
.SelectMany(e => f(e)) //operator1
.SelectMany(e => g(e)) //operator2
.Subscribe(e => {});
Then the exception could occur in one of the operators, killing the pipeline, but the source could still be alive. Retry
will then try to re-create a new pipeline with the same functions.
You can try to 'cheat' the Observable contract by using Materialize
and Dematerialize
, but you would be swimming upstream. The trick with cheating is to make sure that no part of the pipeline doesn't see a 'raw' OnError
, because that operator will terminate. Rather Materialize
turns an OnError
into a Notification
, which doesn't blow up. That would look like this:
Given a well-behaved pipeline like this:
var someHotSource = new Subject<int>();
var f = new Func<int, IObservable<int>>(i => Observable.Return(i));
var g = new Func<int, IObservable<int>>(i =>
{
if(i % 13 == 0)
return Observable.Throw<int>(new Exception());
return Observable.Return(i);
});
var LogException = new Action<Exception>(e => Console.WriteLine("Exception"));
var p1 = someHotSource
.SelectMany(e => f(e)) //operator1
.SelectMany(e => g(e)) //operator2: suspect
.Subscribe(e => Console.WriteLine(e));
...you can cheat like this:
var p2 = someHotSource
.SelectMany(e => f(e)) //operator1
.SuspectSelectMany(e => g(e), LogException) //operator2: suspect
.Subscribe(e => Console.WriteLine(e));
public static class X
{
public static IObservable<Notification<T>> IgnoreOnCompleted<T>(this IObservable<Notification<T>> source)
{
return source
.SelectMany(n => n.Kind == NotificationKind.OnCompleted
? Observable.Empty<Notification<T>>()
: Observable.Return(n)
);
}
public static IObservable<U> SuspectSelectMany<T, U>(this IObservable<T> source, Func<T, IObservable<U>> selector, Action<Exception> handler)
{
var x = source
.Materialize()
.SelectMany(e => selector(e.Value).Materialize().IgnoreOnCompleted()) //execute suspect selector, turn immediately into notifications
.SelectMany(e =>
{
if (e.Kind == NotificationKind.OnError)
{
handler(e.Exception);
return Observable.Empty<Notification<U>>();
}
else
return Observable.Return(e);
}) //error logging/suppression
.Dematerialize();
return x;
}
}
Then given the following runner code:
someHotSource.OnNext(1);
someHotSource.OnNext(12);
someHotSource.OnNext(13);
someHotSource.OnNext(15);
p1
will bomb. p2
will produce the following output:
1
12
Exception
15
Upvotes: 3