Karl
Karl

Reputation: 275

Rx: Different TakeUntil behaviour with Timer based IObservable vs other IObservable

I'm experiencing different behaviour with an TakeUntil and different implementations of IObservable as the parameter. I'm trying to understand WHY i get the different behaviour. Using Linqpad:

async Task Main()
{
    var byTimer = true;

    var ending = Observable.FromEvent<long>(
        handler => endingHandler += handler,
        handler => endingHandler -= handler);

    var s = new Subject<long>();

    using (var disposibleSub =
        Observable
            .Interval(TimeSpan.FromSeconds(.2))
            .TakeUntil(byTimer ? Observable.Timer(TimeSpan.FromSeconds(1.5)) : ending)
            .DumpLatest()
            .Subscribe(Observer.Checked(s)))
    {
        if (endingHandler != null)
        {
            int r = Console.Read();
            endingHandler?.Invoke(r);
        }
        var v = await s.Count();
        Console.WriteLine("Count of items: {0}", v);
    }
}

public event Action<long> endingHandler;

The one with the timer Count always returns the correct value. However, if I change it to use the FromEvent implementation, I always get 0. Obviously the difference is in the implementation of the two. I've also tried using a Subject implementation for the TakeUntil with the same results as the fromEvent.

The timer result is what I expected.

An explanation why would be appreciated! Thanks.

Upvotes: 0

Views: 122

Answers (1)

Enigmativity
Enigmativity

Reputation: 117027

When byTimer is true then the ending observable never gets a subscription - remember that the observable pipeline is only instantiated when a subscription arrives - so in this case the handler => endingHandler += handler attach event code doesn't run and thus endingHandler is null. That means that the Console.Read() isn't called so the code drops immediately to var v = await s.Count(); and this then catches all of the values passing thru s.

However, when byTimer is false then endingHandler is not null so then Console.Read() is called. When the console is read you immediately invoke endingHandler which stops the observable and calls OnCompleted on the subject. So when it hits var v = await s.Count(); it immediately gets a completed signal, missing all of the previously produced values, and thus you get a count of zero.

If you change your code to be like this:

    int r = Console.Read();
    if (endingHandler != null)
    {
        endingHandler?.Invoke(r);
    }
    var v = await s.Count();

Then the two observables behave exactly the same way.

If you then also go and change the Subject to ReplaySubject then the code should behave as you were originally expecting it to.

Upvotes: 2

Related Questions