Reputation: 26398
I've stumbled across a problem while trying to deal with a worst case; that being disposing of a IConnectableObservable
, that has subscriptions, before it has finished.
I've written a contrived example that reproduces the problem.
var hotSource = Observable
.Return(1)
.Delay(TimeSpan.FromMilliseconds(500))
.Publish();
var disposable = hotSource.Connect();
Here is my hot observable. I've added a delay in so that I can subscribe to it before it fires off the value.
Below I return an awaitable observable that should allow me to wait until the hotSource is complete. I believe I should be subscribed by this point
var awaitable = hotSource
.Do((Console.WriteLine))
.Finally(() => Console.WriteLine("Complete"))
.LastOrDefaultAsync();
Now if I dispose immediately then await my subscription:
disposable.Dispose();
await awaitable;
it simply hangs the application entirely and Finally
is never called.
I'm expecting my awaitable
to return either OnCompleted
or at worst OnError
with ObjectDisposedException
. Also, if you connect after the subscription, it still hangs. Any thoughts?
[Update]
Taking supertoi's answer (that I wasn't subscribing until after the dispose) I've reformulated the problem with an explicit subscription before the dispose.
var mutex = new SemaphoreSlim(0);
var hotSource = Observable
.Return(1)
.Delay(TimeSpan.FromMilliseconds(500))
.Publish();
var subscription = Observable.Create(
(IObserver<int> observer) =>
{
Console.WriteLine("Subscribed");
return hotSource.Subscribe(observer);
})
.Finally(() => mutex.Release())
.Subscribe(Console.WriteLine);
hotSource
.Connect()
.Dispose();
await mutex.WaitAsync();
Console.WriteLine("Complete");
This again halts, but its defiantly subscribing to the IConnectableObservable
before it has been disposed.
Upvotes: 3
Views: 414
Reputation: 3488
LastOrDefaultAsync()
returns an IObservable
so it does not subscribe you to the sequence.
The await
subscribes you to the sequence.
Check the comment on IConnectableObservable.Dispose()
Disposable used to disconnect the observable wrapper from its source, causing subscribed observer to stop receiving values from the underlying observable sequence.
This means that emiting all notifications including OnNext
, OnError
and OnCompleted
is stopped.
So, if you await
after disposing the IConnectableObservable
you will not receive any notifications. You can Connect
again to the hotsource
to receive notifications.
Upvotes: 2