Reputation: 89
I'm having trouble with the way ReactiveCommand deals with ObserveOn and SubscribeOn.
I have an API that returns an observable sequence of strings, and it looks like this :
public IObservable<string> GetDocumentObservable(int numParagraphs, int latency)
{
return Observable.Create<string>(obs =>
{
for (int i = 0; i < numParagraphs; i++)
{
Console.WriteLine("Service On thread {0}", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000);
obs.OnNext("Some String");
}
obs.OnCompleted();
return Disposable.Empty;
});
}
I'm using ReactiveCommand.CreateAsyncObservable to invoke this, using SubscribeOn(RxApp.TaskpoolScheduler) (to ensure the Thread.Sleep doesn't happen on the UI thread), and ObserveOn(RxApp.MainThreadScheduler) to draw strings on my UI thread.
Unfortunately, this all executed synchronously (on the same thread), and I'm not sure why. This is was the VM code looks like :
DownloadDocument = ReactiveCommand
.CreateAsyncObservable(_ =>
{
Console.WriteLine("ViewModel Invoking On thread {0}", Thread.CurrentThread.ManagedThreadId);
return _documentService.GetDocumentObservable(NumParagraphs, 0);
});
DownloadDocument
.SubscribeOn(RxApp.TaskpoolScheduler)
.ObserveOn(RxApp.MainThreadScheduler)
.Subscribe(p =>
{
Console.WriteLine("ViewModel OnNext thread {0}", Thread.CurrentThread.ManagedThreadId);
Document.Add(p);
},
x => { },
() => { Console.WriteLine("ViewModel OnComplete thread {0}", Thread.CurrentThread.ManagedThreadId); });
Eveything executes on the same thread, and blocks the UI thread. If I invoke it "the old fashioned way", everything works as expected (as below):
Something = ReactiveCommand.Create();
Something.Subscribe(x =>
{
_documentService.GetDocumentObservable(NumParagraphs, 0)
.SubscribeOn(RxApp.TaskpoolScheduler)
.ObserveOn(RxApp.MainThreadScheduler)
.Subscribe(p =>
{
Console.WriteLine("ViewModel OnNext thread {0}", Thread.CurrentThread.ManagedThreadId);
Document.Add(p);
},
ex => { },
() => { Console.WriteLine("ViewModel OnComplete thread {0}", Thread.CurrentThread.ManagedThreadId); });
});
No blocked threads here.
Is there something I'm missing when it comes to using ReactiveCommands with Observable apis ?
Upvotes: 2
Views: 898
Reputation: 117029
Your GetDocumentObservable
method doesn't provide Rx to run the code on another thread so immediately during subscribe it will run all the values and signal complete before the call to .Subscribe(...)
has finished.
Some key things to be wary of when writing code like this is the use of return Disposable.Empty;
and Thread.Sleep(...);
. They should be red flags for you.
Instead, you should always try to implement your methods using the other in-built methods first, and only move on to the Create
"roll-your-own" when you have to.
Luckily there's a very powerful built-in operator that meets your needs for this perfectly - Generate
. This method is very useful for generating sequences that incorporate a "sleep".
Here's what it would look like:
public IObservable<string> GetDocumentObservable(int numParagraphs, int latency)
{
return
Observable
.Generate(
0,
i => i < numParagraphs,
i => i + 1,
i => "Some String",
i => i == 0
? TimeSpan.Zero
: TimeSpan.FromSeconds(1.0))
.Do(x => Console.WriteLine(
"Service On thread {0}",
Thread.CurrentThread.ManagedThreadId));
}
This does everything that your method does and should behave as you wanted it to.
Upvotes: 0
Reputation: 74654
ReactiveCommand itself subscribes to your source when the command is invoked, which doesn't pick up your SubscribeOn. The easiest way to fix this is to just wrap your code in Task.Run:
return Observable.Create<string>(obs =>
{
bool stopEarly;
Task.Run(() => {
for (int i = 0; i < numParagraphs; i++)
{
Console.WriteLine("Service On thread {0}", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000);
obs.OnNext("Some String");
if (stopEarly) return;
}
obs.OnCompleted();
});
return Disposable.Create(() => stopEarly = true);
});
Upvotes: 2