heltonbiker
heltonbiker

Reputation: 27575

What's the appropriate way to use schedulers in derived properties to have a responsive UI?

I'm having a hard time trying to figure the proper way to schedule long-running reactive property "getters" in my ViewModel.

This excerpt from Intro to RX describes exactely what I want to do:

  • respond to some sort of user action
  • do work on a background thread
  • pass the result back to the UI thread
  • update the UI

Only in this case besides user interaction I want to react to change from other properties.

Below is the generic template I am using to get a derived property from an original property (in the actual code, there are chains of cascading derived properties).

In a Reactive ViewModel (inheriting from ReactiveObject) I already have some properties that derive from others. For exemple, when Original changes, Derived is recalculated.

    public TOriginal Original
    {
        get { return _original; }
        set { this.RaiseAndSetIfChanged(ref _original, value); }
    }
    TOriginal _original;


    public TDerived Derived { get { return _derived.Value; } }
    readonly ObservableAsPropertyHelper<double[,]> _derived;


    this.WhenAnyValue(x => x.Original)
        .Where(originalValue => originalValue != null)
        // ObserveOn? SubscribeOn? Which scheduler?
        .Select(derivedValue => LongRunningCalculation(originalValue))
        // Same thing here: ObserveOn? SubscribeOn? Which scheduler? 
        .ToProperty(this, x => x.Derived, out _derived); // should I use the `scheduler:` in this method?

My problems are: I have no idea how these different "design choices" should be combined to get my desired responsive UI:

The most nerve-wrecking fact is that, with some combinations, the calculations work properly, but block the UI, while with some other combinations, the values are asynchronously calculated and the UI is a bit less blocked, but sometimes part of the derived values (in a collection of items, for example) are not available!

Sorry if I'm asking too much, but I didn't find any authoritative expected way to do what I need in the docs.

Upvotes: 5

Views: 2714

Answers (2)

Jon G St&#248;dle
Jon G St&#248;dle

Reputation: 3904

Schedulers

In Rx.NET there are a few schedulers, including a special one that's exclusive to WPF.

  • TaskPoolScheduler runs code on the task pool. This is a bit like running code inside a Task.
  • NewThreadScheduler spawns a new thread to run the code on. Generally don't use this operator, unless you know that you "need" it (you almost never don't)
  • DispatcherScheduler runs code on the UI thread. Use this when you're going to set your properties in the VM

RxUI brings two platform agnostic scheduler abstractions. No matter what platform you're on (WPF, UWP, Xamarin.iOS, Xamarin.Android) RxApp.MainThreadScheduler will always refer to the UI thread scheduler, while RxApp.TaskPoolScheduler will refer to something akin to a background thread.

If you want to keep it simple, just use the RxApp schedulers; RxApp.MainThreadScheduler for UI stuff and RxApp.TaskPoolScheduler for background/heavy duty stuff.

ObserveOn/SubscribeOn

The name SubscribeOn() is a bit confusing as it doesn't directly affect Subscribe() method. SubscribeOn() decides which scheduler the observable will start on; on which scheduler the original/first subscription will be done (not which scheduler the Subscribe() method will execute on). I like to think that SubsribeOn() moves up the observable chain to the top and make sure the observable produces values on the given scheduler.

Some operators let's you specify which scheduler they should run on. When they do, you should always prefer to pass a scheduler, that way you know where they're going to do work and prevent them from potentially blocking the UI thead (although they shouldn't). SubsribeOn() is kind of a "hack" for observables that doesn't let you specify a scheduler. If you use SubscribeOn(), but the operator specifies a scheduler, the signals from the operator will be emitted on the operators scheduler, not the one you specified in SubscribeOn().

ObserveOn() does much the same as SubscribeOn(), but it does it "from this point onwards". The operators and code following ObserveOn() will execute on the scheduler given to ObserveOn(). I like to think that ObserveOn() means "change thread to this one".

Doing heavy work

If you are going to do heavy work, put that in a function and call that function, like what you've done with LongRunningCalculation(). You could use a put an ObserveOn(RxApp.TaskPoolScheduler) before the Select() and an ObserveOn(RxApp.MainThreadScheduler after it, but I prefer to use Observable.Start() combined with SelectMany().

Observable.Start() is basically Observable.Return() for functions: "Give me the result of this function as an observable." You can also specify the scheduler it should call the function on.

SelectMany() ensures that we get the result of the observable, instead of the observable itself. (It's kind of like the await for observables: "don't execute this next operator before we have the result of this observable")

Derived properties

You are doing a derived property correct.

Use WhenAnyValue() to get the changes of the property and pipe that to a ToProperty(). The operators you put in between may do work on background threads delay the setting of the derived property, but that's why we have INotifyPropertyChanged.

My take

Here's how I would implement your specific example:

public TOriginal Original
{
    get { return _original; }
    set { this.RaiseAndSetIfChanged(ref _original, value); }
}
TOriginal _original;


public TDerived Derived { get { return _derived.Value; } }
readonly ObservableAsPropertyHelper<double[,]> _derived;


_derived = this.WhenAnyValue(x => x.Original)
    .Where(originalValue => originalValue != null)
    // Sepcify the scheduler to the operator directly
    .SelectMany(originalValue =>
        Observable.Start(
            () => LongRunningCalculation(originalValue),
            RxApp.TaskPoolScheduler))
    .ObserveOn(RxApp.MainThreadScheduler)
    // I prefer this overload of ToProperty, which returns an ObservableAsPropertyHelper
    .ToProperty(this, x => x.Derived);

We have a Slack team for ReactiveUI which you are welcome to join. You can ask for an invite by clicking here

Upvotes: 8

bradgonesurfing
bradgonesurfing

Reputation: 32192

Before a Select that might block the UI observe on the TaskPoolScheduler. Before the ToProperty observe on the MainThreadScheduler.

  this.WhenAnyValue(x => x.Original)
        .Where(originalValue => originalValue != null)
        .ObserveOn(TaskPoolScheduler.Default)
        .Select(derivedValue => LongRunningCalculation(originalValue))
        .ObserveOn(RxApp.MainThreadScheduler)
        .ToProperty(this, x => x.Derived, out _derived); 

Additionally

People are very confused as to what SubscribeOn actually does. There are many explanations. For example as given in another answer here

SubscribeOn moves up the observable chain to the top and make sure the observable produces values on the given scheduler

This is just not true. It is instructive to look at the implementation of SubscribeOn in the RX code base. You have to jump through several layers of abstraction to get there but eventually you find.

public static IObservable<TSource> 
    SubscribeOn<TSource>
   ( IObservable<TSource> source
   , IScheduler scheduler
   )
{
  if (source == null)
    throw new ArgumentNullException("source");
  if (scheduler == null)
    throw new ArgumentNullException("scheduler");
  return (IObservable<TSource>) new AnonymousObservable<TSource>((Func<IObserver<TSource>, IDisposable>) (observer =>
  {
    SingleAssignmentDisposable assignmentDisposable = new SingleAssignmentDisposable();
    SerialDisposable d = new SerialDisposable();
    d.Disposable = (IDisposable) assignmentDisposable;
    assignmentDisposable.Disposable = scheduler.Schedule((Action) (() => d.Disposable = (IDisposable) new ScheduledDisposable(scheduler, source.SubscribeSafe<TSource>(observer))));
    return (IDisposable) d;
  }));
}

The only thing this does is ensure that the Subscribe method on source is called on the specified scheduler and that the Dispose method on the disposable returned by the same Subscribe method is also called on the specified scheduler. The effect that this has on the downstream code is varied.

For example

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace SubscribeOnVsObserveOn
{
    class Program
    {
        static readonly Subject<object> EventsSubject = new Subject<object>();

        private static readonly IObservable<object> Events = Observable.Create<object>
            ( observer =>
            {
                Info( "Subscribing"  );
                return EventsSubject.Subscribe( observer );
            } );

        public static void Info(string msg)
        {
            var currentThread = Thread.CurrentThread;
            var currentThreadName = string.IsNullOrWhiteSpace( currentThread.Name ) ? "<no name>" : currentThread.Name;
            Console.WriteLine
                ( $"Thread Id {currentThread.ManagedThreadId} {currentThreadName} - " + msg );
        }

        public static void  Foo()
        {
            Thread.CurrentThread.Name = "Main Thread";

            Info( "Starting"  );

            void OnNext(object o) => Info( $"Received {o}" );

            void Notify(object obj)
            {
                Info( $"Sending {obj}"  );
                EventsSubject.OnNext( obj );
            }

            void StartAndSend(object o, string threadName)
            {
                var thread = new Thread(Notify);
                thread.Name = threadName;
                thread.Start(o);
                thread.Join();
            }

            Notify(1);

            Console.WriteLine("=============================================" );
            Console.WriteLine("Subscribe Only" );
            Console.WriteLine("=============================================" );
            using (Events.Subscribe(OnNext))
            {
                Thread.Sleep( 200 );
                StartAndSend(2, "A");
                StartAndSend(3, "B");
            }

            Console.WriteLine("=============================================" );
            Console.WriteLine("Subscribe With SubscribeOn(CurrentThreadScheduler)" );
            Console.WriteLine("=============================================" );
            using (Events.SubscribeOn( CurrentThreadScheduler.Instance ).Subscribe(OnNext))
            {
                Thread.Sleep( 200 );
                StartAndSend(2, "A");
                StartAndSend(3, "B");
            }

            Console.WriteLine("=============================================" );
            Console.WriteLine("Subscribe With SubscribeOn(ThreadPool)" );
            Console.WriteLine("=============================================" );
            using (Events.SubscribeOn( ThreadPoolScheduler.Instance ).Subscribe(OnNext))
            {
                Thread.Sleep( 200 );
                StartAndSend(2, "A");
                StartAndSend(3, "B");
            }

            Console.WriteLine("=============================================" );
            Console.WriteLine("Subscribe With SubscribeOn(NewThread)" );
            Console.WriteLine("=============================================" );
            using (Events.SubscribeOn( NewThreadScheduler.Default ).Subscribe(OnNext))
            {
                Thread.Sleep( 200 );
                StartAndSend(2, "A");
                StartAndSend(3, "B");
            }

            Console.WriteLine("=============================================" );
            Console.WriteLine("Subscribe With SubscribeOn(NewThread) + ObserveOn" );
            Console.WriteLine("=============================================" );
            using (Events.SubscribeOn( NewThreadScheduler.Default ).ObserveOn(TaskPoolScheduler.Default  ).Subscribe(OnNext))
            {
                Thread.Sleep( 200 );
                StartAndSend(2, "A");
                StartAndSend(3, "B");
            }
        }




        static void Main(string[] args)
        {
            Foo();
            Console.WriteLine( "Press Any Key" );
            Console.ReadLine();
        }
    }
}

generates the following output

Thread Id 1 Main Thread - Starting
Thread Id 1 Main Thread - Sending 1
=============================================
Subscribe Only
=============================================
Thread Id 1 Main Thread - Subscribing
Thread Id 4 A - Sending 2
Thread Id 4 A - Received 2
Thread Id 5 B - Sending 3
Thread Id 5 B - Received 3
=============================================
Subscribe With SubscribeOn(CurrentThreadScheduler)
=============================================
Thread Id 1 Main Thread - Subscribing
Thread Id 6 A - Sending 2
Thread Id 6 A - Received 2
Thread Id 7 B - Sending 3
Thread Id 7 B - Received 3
=============================================
Subscribe With SubscribeOn(ThreadPool)
=============================================
Thread Id 8 <no name> - Subscribing
Thread Id 10 A - Sending 2
Thread Id 10 A - Received 2
Thread Id 11 B - Sending 3
Thread Id 11 B - Received 3
=============================================
Subscribe With SubscribeOn(NewThread)
=============================================
Thread Id 12 <no name> - Subscribing
Thread Id 13 A - Sending 2
Thread Id 13 A - Received 2
Thread Id 14 B - Sending 3
Thread Id 14 B - Received 3
=============================================
Subscribe With SubscribeOn(NewThread) + ObserveOn
=============================================
Thread Id 16 <no name> - Subscribing
Thread Id 17 A - Sending 2
Thread Id 19 B - Sending 3
Thread Id 18 <no name> - Received 2
Thread Id 18 <no name> - Received 3
Press Any Key

The takeaway being that SubscribeOn can neither force the sending or receiving of events to be on a specific scheduler. It can only force the Subscribe method to occur on a specific scheduler. This may or may not have downstream / upstream effects.

Upvotes: 4

Related Questions