Alexander Groß
Alexander Groß

Reputation: 10458

In Rx, how do I ensure no notifications are lost due to exceptions

I want to make sure my Rx notifications are not lost when they're processed with my Consumer's Do delegate. I have a Producer that generates messages that I need to process, and retry processing if it fails.

Desired marble diagram:

Producer   1 2 3 4 5
Consumer   1 X 2 3 X X 4 5
Downstream 1   2 3     4 5

The standard Retry won't help here since it'll resubscribe to the Producer after an error. This will lose the notification that failed processing and continue with the next notification.

Retry marble diagram:

Producer  1 2 3 4 5
Consumer  1 X 3 X 5

So far, I have this code, but it doesn't seem right to me:

static void RetryWithBacklog()
{
  var data = Enumerable.Range(1, 100).ToObservable();
  var backlog = new Subject<long>();

  backlog
    .Merge(data)
    .Retry()
    .Do(l =>
    {
      try
      {
        ProcessNotification(l);
      }
      catch (Exception)
      {
        backlog.OnNext(l);
      }
    })
    .Subscribe();

  Console.ReadKey();
}

(Full code sample)

Background

The Do operation will perform a network request based on the Producer's notifications. These might fail, so I need to retry it without losing information to be transmitted. Of course I could blindly while try/catch the network request, but the requests should not happen as long as the network connection is known to be down (see this question).

Solution

static void RetryBeforePassingDownstream()
{
  var data = Enumerable.Range(1, 100).ToObservable();

  data
    .Replay(l => l.SelectMany(ProcessNotification).Retry(), 1)
    .Subscribe(Downstream);

  Console.ReadKey();
}

static IObservable<int> ProcessNotification(int notification)
{
  Console.WriteLine("process: {0}", i);
  // either:
  // throw new Exception("error");

  // or:
  return Observable.Return(notification);
}

static void Downstream(int i)
{
  Console.WriteLine("downstream: {0}", i);
}

Upvotes: 1

Views: 157

Answers (1)

Dave Sexton
Dave Sexton

Reputation: 2652

The point of the Retry operator is that it works on cold observables by resubscribing, thus causing subscription side effects each time that it "retries". For instance, your observable might send a web request each time that an observer subscribes. In your example, the Retry operator doesn't make any sense, especially considering that Interval never calls OnError.

Retry semantics can technically exist separately in the observable and the observer. In your situation, couldn't you simply call ProcessNotification until it succeeds in the Do operator?

Or is your problem simply that downstream observers won't observe the notification when Do throws? I guess that's not what you meant, because in that case you could simply swallow the exception.

Or is your problem that there's a chance that the notification will change in some way or get filtered out by upstream operators if the notification were to be replayed? In that case you could probably just use Do followed by Catch and recursion to achieve the same thing without requiring a Subject, but I can't figure out why you'd ever really want to do it.

Update

The Do operator is not asynchronous. Use SelectMany instead so that it's part of your query. That way, cancelling the subscription will also cancel the request. If your request method returns a Task<T> instead of an IObservable<T>, then consider using the overload that accepts a CancellationToken as well.

There's an easier way to make a cold observable that replays the last notification and can be retried simply by using the Retry operator, as follows.

(Untested)

data.Replay(r => r.YourStateMachine.SelectMany(SendRequestAsync).Retry(), 1);

James' answer here can fill in YourStateMachine.

Upvotes: 1

Related Questions