3dGrabber
3dGrabber

Reputation: 5074

Error Handling with Observable.Using and Observable.Retry

I am trying to combine Observable.Using with Observable.Retry

I have two kinds of errors that can occur:

  1. Fatal errors, that need disposal and subsequent recreation of the underlying IDisposable.
  2. Transient errors, that do NOT need disposal of the IDisposable.

I can use Retry to handle the first kind of errors.
But how do I handle transient errors?

Below a toyed-down example that hopefully illustrates the problem.
If possible, the solution should allow me to write the code in the same "railway oriented" manner.
The solution should work for both hot and cold observables.

var subscription = Observable
                  .Using(() => new Foo(), foo => foo.Observe())
                  .Select(i => ThrowIf(i, 2))  // fatal error
                  .Retry()                     // Foo should be disposed (works)
                  .Select(i => ThrowIf(i, 5))  // transient error
                  .Retry()                     // Foo should NOT be disposed here   
                  .Subscribe(Console.WriteLine);

Console.ReadLine();
subscription.Dispose();


class Foo : IDisposable
{
    public Foo()          => Console.WriteLine(nameof(Foo) + " created");
    public void Dispose() => Console.WriteLine(nameof(Foo) + " disposed");

    public IObservable<Int32> Observe()
    {
        return Observable
              .Interval(TimeSpan.FromSeconds(1))
              .Select(_ => DateTime.Now.Second % 10);
    }
}


Int32 ThrowIf(Int32 i, Int32 n)
{
    if (i == n)
    {
        Console.WriteLine("!" + i);
        throw new Exception();
    }

    return i;
}

Sample output:

1
!2
Foo disposed
Foo created
3
4
!5
Foo disposed     <= 
Foo created      <= this should not happen
6
7

Edit, clarify:

The order of the error sources matters:
fatal followed by transient.

In the real code the IDisposable is a UdpClient,
The fatal error source is UdpClient.ReceiveAsync() and
and transient is a function that parses the data received.

Upvotes: 1

Views: 211

Answers (1)

Theodor Zoulias
Theodor Zoulias

Reputation: 43515

I don't think that it's possible to solve this problem using a single railway. If you are OK with adding a branch, here is a solution:

var observable = Observable
    .Using(() => new Foo(), foo => foo.Observe()
        .Select(i => ThrowIf(i, 5))
        .Retry() // Foo should NOT be disposed
    )
    .Select(i => ThrowIf(i, 3))
    .Retry(); // Foo should be disposed

The branch inside the Using deals with transient errors, and the outer (main) railroad deals with fatal errors.

Upvotes: 1

Related Questions