Reputation: 1013
I'm writing an ICommand
that does an asynchronous operation and publishes the results to an observable sequence. Results are supposed to be lazy - nothing happens unless someone subscribes to the result. And if the user disposes their subscription to the results, it is supposed to cancel. My code below (hugely simplified) generally works. The tricky thing is that when Execute is called I want the async operation to happen just one time even if there are many subscribers to the results. I thought I just need to do a Replay.RefCount
before publishing the result. But this doesn't work. Or at least it does not work in my test when the observable function completes quickly. The first subscriber gets the entire result, including the completion message, which causes the published result to be disposed and then completely recreated for the second subscriber. One hack I used to get this to work is to insert a 1 tick delay at the end of the execution function. This provides enough time for the second subscriber to come along and get the results.
Is this hack legit? I'm not really sure how it works or if it will hold up in non-test scenarios.
What is a less hacky way to ensure the result is only enumerated one time? One thing I thought might work is that when the user subscribes to the result I'd copy the results into a ReplaySubject
and publish that. But I couldn't figure out how to make it work. The first subscriber should get the ball rolling on calculating the results and stuffing them into the ReplaySubject but the second subscriber should just see the ReplaySubject. Maybe this is some kind of custom Observable.Create
.
public class AsyncCommand<T> : IObservable<IObservable<T>>
{
private readonly Func<IObservable<T>> _execute;
Subject<IObservable<T>> _results;
public AsyncCommand(Func<IObservable<T>> execute)
{
_execute = execute;
_results = new Subject<IObservable<T>>();
}
// This would be ICommand.Execute, but I've simplified here
public void Execute() => _results.OnNext(
_execute()
.Delay(TimeSpan.FromTicks(1)) // Take this line out and the test fails
.Replay()
.RefCount());
// Subscribe to the inner observable to see the results of command execution
public IDisposable Subscribe(IObserver<IObservable<T>> observer) =>
_results.Subscribe(observer);
}
[TestClass]
public class AsyncCommandTest
{
[TestMethod]
public void IfSubscribeManyTimes_OnlyExecuteOnce()
{
int executionCount = 0;
var cmd = new AsyncCommand<int>(() => Observable.Create<int>(obs =>
{
obs.OnNext(Interlocked.Increment(ref executionCount));
obs.OnCompleted();
return Disposable.Empty;
}));
cmd.Merge().Subscribe();
cmd.Merge().Subscribe();
cmd.Execute();
Assert.AreEqual(1, executionCount);
}
}
Here is how I tried using ReplaySubject. It works but the result isn't published lazily and the subscription gets lost - disposing the subscription to the results won't cancel the operation.
public void Execute()
{
ReplaySubject<T> result = new ReplaySubject<T>();
var lostSubscription = _execute().Subscribe(result);
_results.OnNext(result);
}
Upvotes: 0
Views: 146
Reputation: 1013
This seems to work.
public void Execute()
{
int subscriptionCount = 0;
int executionCount = 0;
var result = new ReplaySubject<T>();
var disposeLastSubscription = new Subject<Unit>();
_results.OnNext(Observable.Create<T>(obs =>
{
Interlocked.Increment(ref subscriptionCount);
if (Interlocked.Increment(ref executionCount) == 1)
{
IDisposable copySourceToReplay = Observable
.Defer(_execute)
.TakeUntil(disposeLastSubscription)
.Subscribe(result);
}
return new CompositeDisposable(
result.Subscribe(obs),
Disposable.Create(() =>
{
if (Interlocked.Decrement(ref subscriptionCount) == 0)
{
disposeLastSubscription.OnNext(Unit.Default);
}
}));
}));
}
Upvotes: 1