ittay
ittay

Reputation: 689

rx disposes subscription on exception if has linq methods in the middle

When I subscribe with a method which sometimes throws an exception, I get 2 different acts. If I concatenate LINQ methods in the middle, the subscription is disposed, other wise it is not, why?

void main(){
  var numbersSubject=new Subject<int>();

  numbersSubject.subscribe(throwMethod);   // 1,2,3,4,6,7,8,9,10
  // numbersSubject.select(num=>num).subscribe(throwMethod);   // 1,2,3,4

  for(int i=0;i<10;i++)
  {
    try{
      numbersSubject.OnNext(i);
    }catch{}
  }
}

void throwMethod(int num)
{
   if(num==5)
       throw new Exception();
   Console.writeLine(i);
}

Upvotes: 1

Views: 198

Answers (1)

James World
James World

Reputation: 29786

So, to clarify:

When the version without the LINQ operator is run we see:

0,1,2,3,4,6,7,8,9

When the version with the LINQ operator is run we see:

0,1,2,3,4

Notably, when you subscribe a good and a bad behaving Observer to the second version you get output as shown below (output in the comments) like so:

numbersSubject.Subscribe(throwMethod);   
var source = numbersSubject.Select(num=>num);
source.Subscribe(Console.WriteLine); // 0,1,2,3,4,5,6,7,8,9
source.Subscribe(throwMethod);   // 0,1,2,3,4

Notice that the "good" Observer gets all events.

The reason is that the build in operators have a protective layer that disposes the subscription of a bad Observer.

From the Rx source code we see:

Safeguarding of the pipeline against rogue observers is required for proper resource clean-up. Consider the following example:

var xs  = Observable.Interval(TimeSpan.FromSeconds(1));
var ys  = <some random sequence>;
var res = xs.CombineLatest(ys, (x, y) => x + y);

The marble diagram of the query above looks as follows:

xs  -----0-----1-----2-----3-----4-----5-----6-----7-----8-----9---...
               |     |     |     |     |     |     |     |     |
ys  --------4--+--5--+-----+--2--+--1--+-----+-----+--0--+-----+---...
            |  |  |  |     |  |  |  |  |     |     |  |  |     |
            v  v  v  v     v  v  v  v  v     v     v  v  v     v
res --------4--5--6--7-----8--5--6--5--6-----7-----8--7--8-----9---...
                              |
                             @#&

Notice the free-threaded nature of Rx, where messages on the resulting sequence are produced by either of the two input sequences to CombineLatest.

Now assume an exception happens in the OnNext callback for the observer of res, at the indicated point marked with @#& above. The callback runs in the context of ys, so the exception will take down the scheduler thread of ys. This by itself is a problem (that can be mitigated by a Catch operator on IScheduler), but notice how the timer that produces xs is kept alive.

The safe-guarding code ensures the acquired resources are disposed when the user callback throws.


The above is implemented in an internal class called AutoDetachObserver and wrapped by SafeObserver used by most of the internal operators.

All this does is wrap each OnXXX invocation with try...finally Exception handlers whose finally block disposes the subscription in the event of an error - e.g. OnNext looks like:

        var __noError = false;
        try
        {
            observer.OnNext(value);
            __noError = true;
        }
        finally
        {
            if (!__noError)
                Dispose();
        }

Subject, for performance reasons, doesn't have this layer of protection. A quick way of adding it (and protecting against other abuses) is to add the Synchronize() operator to the subject. e.g:

var numbersSubject = new Subject<int>();
var source = numbersSubject.Synchronize();
source.Subscribe(throwMethod);

Will output

0,1,2,3,4,6,7,8,9

But adding Synchronize as shown:

var numbersSubject = new Subject<int>();
var source = numbersSubject.Synchronize();
source.Synchronize().Subscribe(throwMethod);

Will output

0,1,2,3,4

consistent with the other built-in operators (plus any you implement with Observable.Create).

(EDIT) Notes on handling observer exceptions

Prompted by the comments, here are some additional notes on handling of exceptions thrown from observers.

When an exception originates in the OnNext handler, it's below the Rx code in the stack and it's therefore not possible to responsibly pass it "back" to the user. The user has to be considered dead at that point. All we can reasonably do is dispose the subscription and clean up the resources due to it. This is the consequence of push-based code. Contrast with IEnumerable where exceptions can be thrown to the client code, because it's the client doing the pulling.

Note that certain operators contain user-provided logic (like the predicate expression in a Where operator) that will propagate errors through the OnError channel to an observer, but once an observer has died by throwing an exception in it's own code - that's it. It won't be called via any OnXXX methods again.

There's a ton more on this starting about half way down this epic Bart de Smet post.

Upvotes: 8

Related Questions