SuperJMN
SuperJMN

Reputation: 13972

Exception handling in observable pipeline

I have created an observable that consists of an item being transformed to another by running an async method.

IObservable<Summary> obs = scanner.Scans
                    .SelectMany(b => GetAssignment(b))
                    .SelectMany(b => VerifyAssignment(b))
                    .SelectMany(b => ConfirmAssignmentData(b))
                    .SelectMany(b => UploadAsset(b))
                    .Select(assignment => new Summary())
                    .Catch(LogException());

I would like to make this fail-proof, so in case an exception is thrown during the processing, I should log the exception, but ignore the exception and resume with the next scan (the next item pushed by scanner.Scans)

The current code catches any exception, but the sequence finished as soon as an exception is thrown.

How can I make it "swallow" the exception (logging it), but to resume with the next item?

Upvotes: 5

Views: 1342

Answers (3)

Theodor Zoulias
Theodor Zoulias

Reputation: 43474

You could use the application-specific operator LogAndIgnoreError below:

/// <summary>Ensures that the source sequence will always complete successfully.
/// In case of failure the error is logged.</summary>
public static IObservable<T> LogAndIgnoreError<T>(this IObservable<T> source)
{
    return source.Catch((Exception error) =>
    {
        // Application-specific logging
        Console.WriteLine($"Log - {error.GetType().Name}: {error.Message}");
        return Observable.Empty<T>();
    });
}

You could then attach this operator to any sequence whose error you would like to ignore.

Usage example:

IObservable<Summary> obs = scanner.Scans
    .SelectMany(b => GetAssignment(b).LogAndIgnoreError())
    .SelectMany(b => VerifyAssignment(b).LogAndIgnoreError())
    .SelectMany(b => ConfirmAssignmentData(b).LogAndIgnoreError())
    .SelectMany(b => UploadAsset(b).LogAndIgnoreError())
    .Select(assignment => new Summary())
    .LogAndIgnoreError();

Upvotes: 1

Enigmativity
Enigmativity

Reputation: 117037

Rx is a functional paradigm so it's very useful to use a functional approach to solving this problem.

The answer is to introduce another monad that can cope with errors, like Nullable<T> can cope with integers having a null value, but in this case a class that can either represent a value or an exception.

public class Exceptional
{
    public static Exceptional<T> From<T>(T value) => new Exceptional<T>(value);
    public static Exceptional<T> From<T>(Exception ex) => new Exceptional<T>(ex);
    public static Exceptional<T> From<T>(Func<T> factory) => new Exceptional<T>(factory);
}

public class Exceptional<T>
{
    public bool HasException { get; private set; }
    public Exception Exception { get; private set; }
    public T Value { get; private set; }

    public Exceptional(T value)
    {
        this.HasException = false;
        this.Value = value;
    }

    public Exceptional(Exception exception)
    {
        this.HasException = true;
        this.Exception = exception;
    }

    public Exceptional(Func<T> factory)
    {
        try
        {
            this.Value = factory();
            this.HasException = false;
        }
        catch (Exception ex)
        {
            this.Exception = ex;
            this.HasException = true;
        }
    }

    public override string ToString() =>
        this.HasException
            ? this.Exception.GetType().Name
            : (this.Value != null ? this.Value.ToString() : "null");
}


public static class ExceptionalExtensions
{
    public static Exceptional<T> ToExceptional<T>(this T value) => Exceptional.From(value);

    public static Exceptional<T> ToExceptional<T>(this Func<T> factory) => Exceptional.From(factory);

    public static Exceptional<U> Select<T, U>(this Exceptional<T> value, Func<T, U> m) =>
        value.SelectMany(t => Exceptional.From(() => m(t)));

    public static Exceptional<U> SelectMany<T, U>(this Exceptional<T> value, Func<T, Exceptional<U>> k) =>
        value.HasException ? Exceptional.From<U>(value.Exception) : k(value.Value);

    public static Exceptional<V> SelectMany<T, U, V>(this Exceptional<T> value, Func<T, Exceptional<U>> k, Func<T, U, V> m) =>
        value.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t, u))));
}

So, let's start by creating an Rx query that throws an exception.

IObservable<int> query =
    Observable
        .Range(0, 10)
        .Select(x => 5 - x)
        .Select(x => 100 / x)
        .Select(x => x + 5);

If I run the observable I get this:

Normal Query

Let's transform this with with Exceptional and see how it allows us to continue processing when an error occurs.

IObservable<Exceptional<int>> query =
    Observable
        .Range(0, 10)
        .Select(x => x.ToExceptional())
        .Select(x => x.Select(y => 5 - y))
        .Select(x => x.Select(y => 100 / y))
        .Select(x => x.Select(y => y + 5));

Now when I run it I get this:

Query with Exceptional

Now I could test each result, see if HasException is true and log each exception, meanwhile the observable continues.

Finally, it's easy to clean up the query to look almost the same as the original by introducing one further extension method.

    public static IObservable<Exceptional<U>> Select<T, U>(this IObservable<Exceptional<T>> source, Func<T, U> m) =>
        source.Select(x => x.SelectMany(y => Exceptional.From(() => m(y))));

This combines observables and exceptionals into a single Select operator.

Now the query can look like this:

IObservable<Exceptional<int>> query =
    Observable
        .Range(0, 10)
        .Select(x => x.ToExceptional())
        .Select(x => 5 - x)
        .Select(x => 100 / x)
        .Select(x => x + 5);

I get the same result at before.


Finally, I could get this all working with query syntax by adding two more extension methods:

public static IObservable<Exceptional<U>> SelectMany<T, U>(this IObservable<T> source, Func<T, Exceptional<U>> k) =>
    source.Select(t => k(t));

public static IObservable<Exceptional<V>> SelectMany<T, U, V>(this IObservable<T> source, Func<T, Exceptional<U>> k, Func<T, U, V> m) =>
    source.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t, u))));

This allows:

IObservable<Exceptional<int>> query =
    from n in Observable.Range(0, 10)
    from x in n.ToExceptional()
    let a = 5 - x
    let b = 100 / a
    select b + 5;

Again, I get the same results as before.

Upvotes: 5

Shlomo
Shlomo

Reputation: 14350

The question presumes a fundamental misunderstanding: According to the Observable contract, a well-behaved observable terminates after an OnError notification. To your case, there is no, "just log and continue" option, because there's nothing to continue on. The observable that throws an exception via an OnError is done, kaput, finito, gone forever.

A comment mentioned Retry, which may be applicable: If you have an observable pipeline like so:

someHotSource
    .SelectMany(e => f(e)) //operator1
    .SelectMany(e => g(e)) //operator2
    .Subscribe(e => {});

Then the exception could occur in one of the operators, killing the pipeline, but the source could still be alive. Retry will then try to re-create a new pipeline with the same functions.

You can try to 'cheat' the Observable contract by using Materialize and Dematerialize, but you would be swimming upstream. The trick with cheating is to make sure that no part of the pipeline doesn't see a 'raw' OnError, because that operator will terminate. Rather Materialize turns an OnError into a Notification, which doesn't blow up. That would look like this:

Given a well-behaved pipeline like this:

var someHotSource = new Subject<int>();
var f = new Func<int, IObservable<int>>(i => Observable.Return(i));
var g = new Func<int, IObservable<int>>(i =>
{
    if(i % 13 == 0)
        return Observable.Throw<int>(new Exception());
    return Observable.Return(i);
});

var LogException = new Action<Exception>(e => Console.WriteLine("Exception"));
var p1 = someHotSource
    .SelectMany(e => f(e)) //operator1
    .SelectMany(e => g(e)) //operator2: suspect
    .Subscribe(e => Console.WriteLine(e));

...you can cheat like this:

var p2 = someHotSource
    .SelectMany(e => f(e)) //operator1
    .SuspectSelectMany(e => g(e), LogException) //operator2: suspect
    .Subscribe(e => Console.WriteLine(e));

public static class X
{
    public static IObservable<Notification<T>> IgnoreOnCompleted<T>(this IObservable<Notification<T>> source)
    {
        return source
            .SelectMany(n => n.Kind == NotificationKind.OnCompleted
                ? Observable.Empty<Notification<T>>()
                : Observable.Return(n)
            );
    }
    
    public static IObservable<U> SuspectSelectMany<T, U>(this IObservable<T> source, Func<T, IObservable<U>> selector, Action<Exception> handler)
    {
        var x = source
            .Materialize()
            .SelectMany(e => selector(e.Value).Materialize().IgnoreOnCompleted()) //execute suspect selector, turn immediately into notifications
            .SelectMany(e =>
            {
                if (e.Kind == NotificationKind.OnError)
                {
                    handler(e.Exception);
                    return Observable.Empty<Notification<U>>();
                }
                else
                    return Observable.Return(e);
            }) //error logging/suppression
            .Dematerialize();
        return x;
    }
}

Then given the following runner code:

someHotSource.OnNext(1);
someHotSource.OnNext(12);
someHotSource.OnNext(13);
someHotSource.OnNext(15);

p1 will bomb. p2 will produce the following output:

1
12
Exception
15

Upvotes: 3

Related Questions