Reputation: 27575
I'm having a hard time trying to figure the proper way to schedule long-running reactive property "getters" in my ViewModel.
This excerpt from Intro to RX describes exactely what I want to do:
- respond to some sort of user action
- do work on a background thread
- pass the result back to the UI thread
- update the UI
Only in this case besides user interaction I want to react to change from other properties.
Below is the generic template I am using to get a derived property from an original property (in the actual code, there are chains of cascading derived properties).
In a Reactive ViewModel (inheriting from ReactiveObject) I already have some properties that derive from others. For exemple, when Original
changes, Derived
is recalculated.
public TOriginal Original
{
get { return _original; }
set { this.RaiseAndSetIfChanged(ref _original, value); }
}
TOriginal _original;
public TDerived Derived { get { return _derived.Value; } }
readonly ObservableAsPropertyHelper<double[,]> _derived;
this.WhenAnyValue(x => x.Original)
.Where(originalValue => originalValue != null)
// ObserveOn? SubscribeOn? Which scheduler?
.Select(derivedValue => LongRunningCalculation(originalValue))
// Same thing here: ObserveOn? SubscribeOn? Which scheduler?
.ToProperty(this, x => x.Derived, out _derived); // should I use the `scheduler:` in this method?
My problems are: I have no idea how these different "design choices" should be combined to get my desired responsive UI:
RxApp.TaskpoolScheduler
, RxApp.MainThreadScheduler
, NewThreadScheduler.Default
, and possibly others.SubscribeOn
vs ObserveOn
of even ObserveOnDispatcher
or the scheduler:
parameter of ToProperty
?Select
operator, but I'm not so sure. I'm not sure Select
is even needed, to be frank.Binding.IsAsync
to true
, but I tried it and haven't seem much difference, but again, maybe it is because of the other factors.SynchronizationContext
and ThreadPriority
relavant here? Is there a way to configure them in the code shown?ReactiveCommand
or some other ReactiveUI class for this?The most nerve-wrecking fact is that, with some combinations, the calculations work properly, but block the UI, while with some other combinations, the values are asynchronously calculated and the UI is a bit less blocked, but sometimes part of the derived values (in a collection of items, for example) are not available!
Sorry if I'm asking too much, but I didn't find any authoritative expected way to do what I need in the docs.
Upvotes: 5
Views: 2714
Reputation: 3904
In Rx.NET there are a few schedulers, including a special one that's exclusive to WPF.
TaskPoolScheduler
runs code on the task pool. This is a bit like running code inside a Task
.NewThreadScheduler
spawns a new thread to run the code on. Generally don't use this operator, unless you know that you "need" it (you almost never don't)DispatcherScheduler
runs code on the UI thread. Use this when you're going to set your properties in the VMRxUI brings two platform agnostic scheduler abstractions. No matter what platform you're on (WPF, UWP, Xamarin.iOS, Xamarin.Android) RxApp.MainThreadScheduler
will always refer to the UI thread scheduler, while RxApp.TaskPoolScheduler
will refer to something akin to a background thread.
If you want to keep it simple, just use the RxApp
schedulers; RxApp.MainThreadScheduler
for UI stuff and RxApp.TaskPoolScheduler
for background/heavy duty stuff.
The name SubscribeOn()
is a bit confusing as it doesn't directly affect Subscribe()
method. SubscribeOn()
decides which scheduler the observable will start on; on which scheduler the original/first subscription will be done (not which scheduler the Subscribe()
method will execute on). I like to think that SubsribeOn()
moves up the observable chain to the top and make sure the observable produces values on the given scheduler.
Some operators let's you specify which scheduler they should run on. When they do, you should always prefer to pass a scheduler, that way you know where they're going to do work and prevent them from potentially blocking the UI thead (although they shouldn't). SubsribeOn()
is kind of a "hack" for observables that doesn't let you specify a scheduler. If you use SubscribeOn()
, but the operator specifies a scheduler, the signals from the operator will be emitted on the operators scheduler, not the one you specified in SubscribeOn()
.
ObserveOn()
does much the same as SubscribeOn()
, but it does it "from this point onwards". The operators and code following ObserveOn()
will execute on the scheduler given to ObserveOn()
. I like to think that ObserveOn()
means "change thread to this one".
If you are going to do heavy work, put that in a function and call that function, like what you've done with LongRunningCalculation()
. You could use a put an ObserveOn(RxApp.TaskPoolScheduler)
before the Select()
and an ObserveOn(RxApp.MainThreadScheduler
after it, but I prefer to use Observable.Start()
combined with SelectMany()
.
Observable.Start()
is basically Observable.Return()
for functions: "Give me the result of this function as an observable." You can also specify the scheduler it should call the function on.
SelectMany()
ensures that we get the result of the observable, instead of the observable itself. (It's kind of like the await
for observables: "don't execute this next operator before we have the result of this observable")
You are doing a derived property correct.
Use WhenAnyValue()
to get the changes of the property and pipe that to a ToProperty()
. The operators you put in between may do work on background threads delay the setting of the derived property, but that's why we have INotifyPropertyChanged
.
Here's how I would implement your specific example:
public TOriginal Original
{
get { return _original; }
set { this.RaiseAndSetIfChanged(ref _original, value); }
}
TOriginal _original;
public TDerived Derived { get { return _derived.Value; } }
readonly ObservableAsPropertyHelper<double[,]> _derived;
_derived = this.WhenAnyValue(x => x.Original)
.Where(originalValue => originalValue != null)
// Sepcify the scheduler to the operator directly
.SelectMany(originalValue =>
Observable.Start(
() => LongRunningCalculation(originalValue),
RxApp.TaskPoolScheduler))
.ObserveOn(RxApp.MainThreadScheduler)
// I prefer this overload of ToProperty, which returns an ObservableAsPropertyHelper
.ToProperty(this, x => x.Derived);
We have a Slack team for ReactiveUI which you are welcome to join. You can ask for an invite by clicking here
Upvotes: 8
Reputation: 32192
Before a Select
that might block the UI observe on the TaskPoolScheduler.
Before the ToProperty
observe on the MainThreadScheduler.
this.WhenAnyValue(x => x.Original)
.Where(originalValue => originalValue != null)
.ObserveOn(TaskPoolScheduler.Default)
.Select(derivedValue => LongRunningCalculation(originalValue))
.ObserveOn(RxApp.MainThreadScheduler)
.ToProperty(this, x => x.Derived, out _derived);
People are very confused as to what SubscribeOn
actually does. There are many explanations. For example as given in another answer here
SubscribeOn moves up the observable chain to the top and make sure the observable produces values on the given scheduler
This is just not true. It is instructive to look at the implementation of SubscribeOn
in the RX code base. You have to jump through several layers of abstraction to get there but eventually you find.
public static IObservable<TSource>
SubscribeOn<TSource>
( IObservable<TSource> source
, IScheduler scheduler
)
{
if (source == null)
throw new ArgumentNullException("source");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return (IObservable<TSource>) new AnonymousObservable<TSource>((Func<IObserver<TSource>, IDisposable>) (observer =>
{
SingleAssignmentDisposable assignmentDisposable = new SingleAssignmentDisposable();
SerialDisposable d = new SerialDisposable();
d.Disposable = (IDisposable) assignmentDisposable;
assignmentDisposable.Disposable = scheduler.Schedule((Action) (() => d.Disposable = (IDisposable) new ScheduledDisposable(scheduler, source.SubscribeSafe<TSource>(observer))));
return (IDisposable) d;
}));
}
The only thing this does is ensure that the Subscribe
method on source
is called on the specified scheduler and that the Dispose
method on the disposable returned by the same Subscribe
method is also called on the specified scheduler. The effect that this has on the downstream code is varied.
For example
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
namespace SubscribeOnVsObserveOn
{
class Program
{
static readonly Subject<object> EventsSubject = new Subject<object>();
private static readonly IObservable<object> Events = Observable.Create<object>
( observer =>
{
Info( "Subscribing" );
return EventsSubject.Subscribe( observer );
} );
public static void Info(string msg)
{
var currentThread = Thread.CurrentThread;
var currentThreadName = string.IsNullOrWhiteSpace( currentThread.Name ) ? "<no name>" : currentThread.Name;
Console.WriteLine
( $"Thread Id {currentThread.ManagedThreadId} {currentThreadName} - " + msg );
}
public static void Foo()
{
Thread.CurrentThread.Name = "Main Thread";
Info( "Starting" );
void OnNext(object o) => Info( $"Received {o}" );
void Notify(object obj)
{
Info( $"Sending {obj}" );
EventsSubject.OnNext( obj );
}
void StartAndSend(object o, string threadName)
{
var thread = new Thread(Notify);
thread.Name = threadName;
thread.Start(o);
thread.Join();
}
Notify(1);
Console.WriteLine("=============================================" );
Console.WriteLine("Subscribe Only" );
Console.WriteLine("=============================================" );
using (Events.Subscribe(OnNext))
{
Thread.Sleep( 200 );
StartAndSend(2, "A");
StartAndSend(3, "B");
}
Console.WriteLine("=============================================" );
Console.WriteLine("Subscribe With SubscribeOn(CurrentThreadScheduler)" );
Console.WriteLine("=============================================" );
using (Events.SubscribeOn( CurrentThreadScheduler.Instance ).Subscribe(OnNext))
{
Thread.Sleep( 200 );
StartAndSend(2, "A");
StartAndSend(3, "B");
}
Console.WriteLine("=============================================" );
Console.WriteLine("Subscribe With SubscribeOn(ThreadPool)" );
Console.WriteLine("=============================================" );
using (Events.SubscribeOn( ThreadPoolScheduler.Instance ).Subscribe(OnNext))
{
Thread.Sleep( 200 );
StartAndSend(2, "A");
StartAndSend(3, "B");
}
Console.WriteLine("=============================================" );
Console.WriteLine("Subscribe With SubscribeOn(NewThread)" );
Console.WriteLine("=============================================" );
using (Events.SubscribeOn( NewThreadScheduler.Default ).Subscribe(OnNext))
{
Thread.Sleep( 200 );
StartAndSend(2, "A");
StartAndSend(3, "B");
}
Console.WriteLine("=============================================" );
Console.WriteLine("Subscribe With SubscribeOn(NewThread) + ObserveOn" );
Console.WriteLine("=============================================" );
using (Events.SubscribeOn( NewThreadScheduler.Default ).ObserveOn(TaskPoolScheduler.Default ).Subscribe(OnNext))
{
Thread.Sleep( 200 );
StartAndSend(2, "A");
StartAndSend(3, "B");
}
}
static void Main(string[] args)
{
Foo();
Console.WriteLine( "Press Any Key" );
Console.ReadLine();
}
}
}
generates the following output
Thread Id 1 Main Thread - Starting
Thread Id 1 Main Thread - Sending 1
=============================================
Subscribe Only
=============================================
Thread Id 1 Main Thread - Subscribing
Thread Id 4 A - Sending 2
Thread Id 4 A - Received 2
Thread Id 5 B - Sending 3
Thread Id 5 B - Received 3
=============================================
Subscribe With SubscribeOn(CurrentThreadScheduler)
=============================================
Thread Id 1 Main Thread - Subscribing
Thread Id 6 A - Sending 2
Thread Id 6 A - Received 2
Thread Id 7 B - Sending 3
Thread Id 7 B - Received 3
=============================================
Subscribe With SubscribeOn(ThreadPool)
=============================================
Thread Id 8 <no name> - Subscribing
Thread Id 10 A - Sending 2
Thread Id 10 A - Received 2
Thread Id 11 B - Sending 3
Thread Id 11 B - Received 3
=============================================
Subscribe With SubscribeOn(NewThread)
=============================================
Thread Id 12 <no name> - Subscribing
Thread Id 13 A - Sending 2
Thread Id 13 A - Received 2
Thread Id 14 B - Sending 3
Thread Id 14 B - Received 3
=============================================
Subscribe With SubscribeOn(NewThread) + ObserveOn
=============================================
Thread Id 16 <no name> - Subscribing
Thread Id 17 A - Sending 2
Thread Id 19 B - Sending 3
Thread Id 18 <no name> - Received 2
Thread Id 18 <no name> - Received 3
Press Any Key
The takeaway being that SubscribeOn can neither force the sending or receiving of events to be on a specific scheduler. It can only force the Subscribe
method to occur on a specific scheduler. This may or may not have downstream / upstream effects.
Upvotes: 4