askoutaris
askoutaris

Reputation: 59

Rx Net - Handle errors and resubscribe to observable

I'm planning to replace .Net events with observables. I like the Linq way that every subscriber can filter and transform data in their own way.

Rx by default is disposing the subscription and closing the stream when an unhandled exception is thrown somewhere in the piping (eg inside select, where, subscribe). As a result, any next calls to onNext() will not end up on the subscription because it was disposed by the previous exception, and the listener which was subscribed will not be notified!

With classic .Net events this is not an issue. Every event invocation is independent of any other, meaning that even if a call throws an exception, the next call will have it's own chance to run successfully.

Ideally, I would like to use EventLooperScheduler so any call to onNext will not wait for subscribers to do their job.

I have ended up with the following code so far:

class Program
{
    static void Main(string[] args)
    {
        WriteLine("Main Thread {{THREAD_ID}}");

        RxEvent<int> myEvent = new RxEvent<int>(new EventLoopScheduler());

        IDisposable subscriptionA = myEvent.Stream
            .Where(x =>
            {
                //exception for testing purposes, this could be a null_reference, or a file_not_found, or a webrequest exception
                if (x.EventArgs == 2)
                    throw new ArgumentException();

                return true;
            })
            .Subscribe(x =>
            {
                //exception for testing purposes, this could be a null_reference, or a file_not_found, or a webrequest exception
                //comment out Where() exception to try this one
                if (x.EventArgs == 3)
                    throw new ArgumentException();

                WriteLine($"Subscriber A {{THREAD_ID}} Value: {x.EventArgs}");

                Thread.Sleep(500);
            },
            onError =>
            {
                var ex = onError;
            });

        IDisposable subscriptionB = myEvent.Stream.Subscribe(x =>
        {
            WriteLine($"Subscriber B {{THREAD_ID}} Value: {x.EventArgs}");
            Thread.Sleep(500);
        });

        for (var i = 1; i <= 7; i++)
        {
            myEvent.Invoke(null, i);
        }

        System.Console.WriteLine("END");
        System.Console.ReadLine();
    }

    private static void WriteLine(string str)
    {
        str = str.Replace("{THREAD_ID}", $"[Thread {Thread.CurrentThread.ManagedThreadId}]");
        System.Console.WriteLine(str);
    }
}

public class RxEvent<T>
{
    EventHandler<T> _Event;
    IObservable<EventPattern<T>> _Observable;
    IScheduler _Scheduler;

    public RxEvent()
        : this(CurrentThreadScheduler.Instance)
    {
    }

    public RxEvent(IScheduler scheduler)
    {
        _Scheduler = scheduler;
    }

    public IObservable<EventPattern<T>> Stream
    {
        get
        {
            if (_Observable == null)
                _Observable = Observable.FromEventPattern<T>(h => _Event += h, h => _Event -= h).ObserveOn(_Scheduler);

            return _Observable;
        }
    }

    public void Invoke(object sender, T args)
    {
        _Event?.Invoke(sender, args);
    }
}

When the first exception is thrown, Where's implementation catches it and just disposes the subscription. In contrast, when the second exception (inside subscribe) is thrown, no one is handled it, and makes thread to stop.

My question is, is threre any way to handle these situations and keep the subscription alive in order to proccess any upcoming invocations, even if some of them throw an unhandled exception?

In my scenarion I get the following output when the first exception is throw. (when the second exception is throw the thread crashes):

Main Thread {[Thread 1]}
END
Subscriber A [Thread 4] Value: 1
Subscriber B [Thread 4] Value: 1
Subscriber B [Thread 4] Value: 2
Subscriber B [Thread 4] Value: 3
Subscriber B [Thread 4] Value: 4
Subscriber B [Thread 4] Value: 5
Subscriber B [Thread 4] Value: 6
Subscriber B [Thread 4] Value: 7

Instead I would like to get:

Main Thread {[Thread 1]}
END
Subscriber A [Thread 4] Value: 1
Subscriber B [Thread 4] Value: 1
Subscriber B [Thread 4] Value: 2
Subscriber B [Thread 4] Value: 3
Subscriber A [Thread 4] Value: 4
Subscriber B [Thread 4] Value: 4
Subscriber A [Thread 4] Value: 5
Subscriber B [Thread 4] Value: 5
Subscriber A [Thread 4] Value: 6
Subscriber B [Thread 4] Value: 6
Subscriber A [Thread 4] Value: 7
Subscriber B [Thread 4] Value: 7

(Value 2 and 3 throws an exception for subscriberA so nothing should be printed)

A relative resource I found is Rx - can/should I replace .NET events with Observables?

Upvotes: 1

Views: 2592

Answers (1)

Krzysztof Skowronek
Krzysztof Skowronek

Reputation: 2946

Just one simple rule: no exceptions in explicit Subscribe.

Use Do instead, that way you can also use Retry after it to restart the subscription.

As shown in this answer, there is also an overload of Do that takes OnError handler:

source
    .Do(_ => { 
              throw new SomeException();
        }, 
            exception => Console.WriteLine(exception.ToString()) // log exceptions from source stream,
            () => {})
    .Retry()
    .Subscribe(
        l => Console.WriteLine($"OnNext {l}"),
        //      exception => Console.WriteLine(exception.ToString()), // Would be logging this in production
        () => Console.WriteLine("OnCompleted")
    );

You can throw Do().Retry() at the end and it will work and you won't miss any notifications. Just remember to dispose the subscriptions, otherwise it will wait for OnCompleted signal.

Upvotes: 2

Related Questions