Meirion Hughes
Meirion Hughes

Reputation: 26398

Disposing a IConnectableObservable doesn't send OnComplete

I've stumbled across a problem while trying to deal with a worst case; that being disposing of a IConnectableObservable, that has subscriptions, before it has finished.

I've written a contrived example that reproduces the problem.

var hotSource = Observable
    .Return(1)
    .Delay(TimeSpan.FromMilliseconds(500))
    .Publish();

var disposable = hotSource.Connect();

Here is my hot observable. I've added a delay in so that I can subscribe to it before it fires off the value.

Below I return an awaitable observable that should allow me to wait until the hotSource is complete. I believe I should be subscribed by this point

var awaitable = hotSource
    .Do((Console.WriteLine))
    .Finally(() => Console.WriteLine("Complete"))
    .LastOrDefaultAsync();

Now if I dispose immediately then await my subscription:

disposable.Dispose();
await awaitable;

it simply hangs the application entirely and Finally is never called.

I'm expecting my awaitable to return either OnCompleted or at worst OnError with ObjectDisposedException. Also, if you connect after the subscription, it still hangs. Any thoughts?

[Update]

Taking supertoi's answer (that I wasn't subscribing until after the dispose) I've reformulated the problem with an explicit subscription before the dispose.

var mutex = new SemaphoreSlim(0);

var hotSource = Observable
    .Return(1)
    .Delay(TimeSpan.FromMilliseconds(500))
    .Publish();

var subscription = Observable.Create(
    (IObserver<int> observer) =>
    {
        Console.WriteLine("Subscribed");
        return hotSource.Subscribe(observer);
    })
    .Finally(() => mutex.Release())
    .Subscribe(Console.WriteLine);

hotSource
    .Connect()
    .Dispose();

await mutex.WaitAsync();
Console.WriteLine("Complete");

This again halts, but its defiantly subscribing to the IConnectableObservable before it has been disposed.

Upvotes: 3

Views: 414

Answers (1)

supertopi
supertopi

Reputation: 3488

LastOrDefaultAsync() returns an IObservable so it does not subscribe you to the sequence. The await subscribes you to the sequence.

Check the comment on IConnectableObservable.Dispose()

Disposable used to disconnect the observable wrapper from its source, causing subscribed observer to stop receiving values from the underlying observable sequence.

This means that emiting all notifications including OnNext, OnError and OnCompleted is stopped.

So, if you await after disposing the IConnectableObservable you will not receive any notifications. You can Connect again to the hotsource to receive notifications.

Upvotes: 2

Related Questions