Reputation: 10458
I want to make sure my Rx notifications are not lost when they're processed with my Consumer's Do
delegate. I have a Producer that generates messages that I need to process, and retry processing if it fails.
Desired marble diagram:
Producer 1 2 3 4 5
Consumer 1 X 2 3 X X 4 5
Downstream 1 2 3 4 5
The standard Retry
won't help here since it'll resubscribe to the Producer after an error. This will lose the notification that failed processing and continue with the next notification.
Retry
marble diagram:
Producer 1 2 3 4 5
Consumer 1 X 3 X 5
So far, I have this code, but it doesn't seem right to me:
static void RetryWithBacklog()
{
var data = Enumerable.Range(1, 100).ToObservable();
var backlog = new Subject<long>();
backlog
.Merge(data)
.Retry()
.Do(l =>
{
try
{
ProcessNotification(l);
}
catch (Exception)
{
backlog.OnNext(l);
}
})
.Subscribe();
Console.ReadKey();
}
The Do
operation will perform a network request based on the Producer's notifications. These might fail, so I need to retry it without losing information to be transmitted. Of course I could blindly while try/catch
the network request, but the requests should not happen as long as the network connection is known to be down (see this question).
static void RetryBeforePassingDownstream()
{
var data = Enumerable.Range(1, 100).ToObservable();
data
.Replay(l => l.SelectMany(ProcessNotification).Retry(), 1)
.Subscribe(Downstream);
Console.ReadKey();
}
static IObservable<int> ProcessNotification(int notification)
{
Console.WriteLine("process: {0}", i);
// either:
// throw new Exception("error");
// or:
return Observable.Return(notification);
}
static void Downstream(int i)
{
Console.WriteLine("downstream: {0}", i);
}
Upvotes: 1
Views: 157
Reputation: 2652
The point of the Retry
operator is that it works on cold observables by resubscribing, thus causing subscription side effects each time that it "retries". For instance, your observable might send a web request each time that an observer subscribes. In your example, the Retry
operator doesn't make any sense, especially considering that Interval
never calls OnError
.
Retry
semantics can technically exist separately in the observable and the observer. In your situation, couldn't you simply call ProcessNotification
until it succeeds in the Do
operator?
Or is your problem simply that downstream observers won't observe the notification when Do
throws? I guess that's not what you meant, because in that case you could simply swallow the exception.
Or is your problem that there's a chance that the notification will change in some way or get filtered out by upstream operators if the notification were to be replayed? In that case you could probably just use Do
followed by Catch
and recursion to achieve the same thing without requiring a Subject
, but I can't figure out why you'd ever really want to do it.
Update
The Do
operator is not asynchronous. Use SelectMany
instead so that it's part of your query. That way, cancelling the subscription will also cancel the request. If your request method returns a Task<T>
instead of an IObservable<T>
, then consider using the overload that accepts a CancellationToken
as well.
There's an easier way to make a cold observable that replays the last notification and can be retried simply by using the Retry
operator, as follows.
(Untested)
data.Replay(r => r.YourStateMachine.SelectMany(SendRequestAsync).Retry(), 1);
James' answer here can fill in YourStateMachine.
Upvotes: 1