ChrisCa
ChrisCa

Reputation: 11006

RX - rethrow an error in containing method

I need to translate an error in an RX stream (IObservable) into an exception in the method that contains the subscription to the stream

(because of this issue https://github.com/aspnet/SignalR/pull/1331 , Whereby errors arent serialised to clients.) Once this issue is fixed I will revert to handling error properly

e.g.

I have the following method

public IObservable<StreamItem> LiveStream()
{
    _mySvc.Start();
    return _mySvc.ThingChanged();
}

So I have tried to subscribe to the stream and rethrow the error, but it still doesnt get transmitted to the client:

public IObservable<StreamItem> LiveStream()
{
    _mySvc.Start();
    _mySvc.ThingChanged().Subscribe(item => {}, OnError, () => {});
    return _mySvc.ThingChanged();
}

private void OnError(Exception exception)
{
    throw new Exception(exception.Message);
}

What I need is the equivelent of throwing in the LiveStream method

e.g. this error is propogated to the client

public IObservable<StreamItem> LiveStream()
{
    _mySvc.Start();
    throw new Exception("some error message");
    return _mySvc.ThingChanged();
}

any ideas how to achieve this?

Upvotes: 5

Views: 884

Answers (2)

Marc L.
Marc L.

Reputation: 3399

I have found this as well, especially with a "contained" reactive pipeline—that is, one with a well-defined beginning and end. In situations like those, it may suffice to simply allow underlying exceptions to bubble up to the containing scope. But as you have found, that concept is rather foreign to Rx generally: what happens in the pipeline stays in the pipeline.

The only way out of this that I have found in a contained scenario is to "slip" the error out of the stream using Catch(), and hand back an empty IObservable to allow the stream to halt naturally (otherwise, you'll hang if you're awaiting an IObservable for completion).

This will not work within your LiveStream() method, because that context/scope should have passed out of existence long before you're consuming your stream. So, this will have to happen in the context that contains the whole pipeline.

Exception error = null;
var source = LiveStream()
  .Catch<WhatYoureStreaming, Exception>(ex => {error = ex; return Observable.Empty<WhatYoureStreaming>(); })
  ...

await source; // if this is how you're awaiting completion

// not a real exception type, use your own
if (error != null) throw new ContainingException("oops", error);

Just don't throw error there at the end, you'll lose the original stack trace.

Upvotes: 3

Enigmativity
Enigmativity

Reputation: 117029

Try this code:

public IObservable<StreamItem> LiveStream()
{
    _mySvc.Start();
    return
        _mySvc
            .ThingChanged()
            .Materialize()
            .Do(x =>
            {
                if (x.Kind == NotificationKind.OnError)
                {
                    OnError(x.Exception);
                }
            })
            .Dematerialize();
}

I'm not sure that this is the best way to go - throwing exceptions like this can cause you grief inside a stream where you end up with the wrong exception handlers firing. You might need to find another solution.

Upvotes: 0

Related Questions