Krzysztof Skowronek
Krzysztof Skowronek

Reputation: 2936

Schedulers in ReactiveUI testing

So, when I develop new feature for my system, I try too do a TDD - the code is to big to do that for old features right now, sadly.

However, I find that sometimes I hit a brick wall during the tests - especially when using Delay and Throttle.

I did a lot of reading and I think I know much more than week before, but I wanted to put all of this into pracitce. I wrote some experiments:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Reactive;
    using System.Reactive.Concurrency;
    using System.Reactive.Linq;
    using System.Reactive.Threading.Tasks;
    using System.Text;
    using System.Threading.Tasks;
    using Microsoft.Reactive.Testing;
    using NUnit.Framework;
    using NUnit.Framework.Internal.Commands;
    using ReactiveUI;
    using ReactiveUI.Testing;

    namespace UtilsTests
    {
        [TestFixture]
        public class SchedulersTests
        {
            private int SecondsN = 1;

            [Test]
            public async Task NoScheduler()
            {
                var t = Observable.Return(Unit.Default).Delay(TimeSpan.FromSeconds(SecondsN), RxApp.MainThreadScheduler)
                    .ObserveOn(RxApp.MainThreadScheduler)
                    .ToTask();
                await t;
            }

            [Test]
            public Task ImmediateSchedulerExperiment()
            {
                return Scheduler.Immediate.With(async s =>
                {
                    var t = Observable.Return(Unit.Default).Delay(TimeSpan.FromSeconds(SecondsN), RxApp.MainThreadScheduler).ToTask();
                    await t;
                });
            }

            [Test]
            public Task ImmediateSchedulerExperiment2()
            {
                return Scheduler.Immediate.With(async s =>
                {
                    var t = Observable.Return(Unit.Default).Delay(TimeSpan.FromSeconds(SecondsN), s).FirstAsync().ToTask();
                    await t;
                });
            }

            [Test]
            public void ImmediateSchedulerExperiment3()
            {
                Scheduler.Immediate.With(s =>
                {
                    var t = false;
                    Observable.Return(Unit.Default).Delay(TimeSpan.FromSeconds(SecondsN), s)
                        .Subscribe(_ =>
                        {
                            t = true;
                        });

                    Assert.IsTrue(t);
                });
            }


            [Test]
            public void TestSchedulerExperiment_SchedulersNotSpecified()
            {
                new TestScheduler().With(s =>
                {
                    var t = false;
                    Observable.Return(Unit.Default).Delay(TimeSpan.FromSeconds(SecondsN), s)
                        .Subscribe(_ =>
                        {
                            t = true;
                        });

                    s.AdvanceByMs(SecondsN * 1000);

                    Assert.IsTrue(t);
                });
            }

            [Test]
            public void TestSchedulerExperiment_DeylaOn_RxMainThread()
            {
                new TestScheduler().With(s =>
                {
                    var t = false;
                    Observable.Return(Unit.Default).Delay(TimeSpan.FromSeconds(SecondsN), RxApp.MainThreadScheduler)
                        .Subscribe(_ =>
                        {
                            t = true;
                        });

                    s.AdvanceByMs(SecondsN * 1000);

                    Assert.IsTrue(t);
                });
            }

            [Test]
            public void TestSchedulerExperiment_DeylaOn_RxTaskPool()
            {
                new TestScheduler().With(s =>
                {
                    var t = false;
                    Observable.Return(Unit.Default).Delay(TimeSpan.FromSeconds(SecondsN), RxApp.TaskpoolScheduler)
                        .Subscribe(_ =>
                        {
                            t = true;
                        });

                    s.AdvanceByMs(SecondsN * 1000);

                    Assert.IsTrue(t);
                });
            }

            [Test]
            public void TestSchedulerExperiment_RunOnTaskPool_ObserveOnMainThread()
            {
                new TestScheduler().With(s =>
                {
                    var t = false;
                    Observable.Return(Unit.Default)
                        .Delay(TimeSpan.FromSeconds(SecondsN), RxApp.TaskpoolScheduler)
                        .ObserveOn(RxApp.MainThreadScheduler)
                        .Subscribe(_ =>
                        {
                            t = true;
                        });

                    s.AdvanceByMs(SecondsN * 1000);

                    Assert.IsTrue(t);
                });
            }

            [Test]
            public void TestSchedulerExperiment_RunOnTaskPool_ObserveOnTaskpool()
            {
                new TestScheduler().With(s =>
                {
                    var t = false;
                    Observable.Return(Unit.Default)
                        .Delay(TimeSpan.FromSeconds(SecondsN), RxApp.TaskpoolScheduler)
                        .ObserveOn(RxApp.TaskpoolScheduler)
                        .Subscribe(_ =>
                        {
                            t = true;
                        });

                    s.AdvanceByMs(SecondsN * 1000);
                    s.AdvanceByMs(1);

                    Assert.IsTrue(t);
                });
            }

            [Test]
            public void TestSchedulerExperiment_RunOnTaskPool_ObserveOnMainThread_MainThreadIsAnotherInstance()
            {
                new TestScheduler().With(s =>
                {
                    var mainThreadScheduler = new TestScheduler();
                    RxApp.MainThreadScheduler = mainThreadScheduler;
                    var t = false;
                    Observable.Return(Unit.Default)
                        .Delay(TimeSpan.FromSeconds(SecondsN), RxApp.TaskpoolScheduler)
                        .ObserveOn(RxApp.MainThreadScheduler)
                        .Subscribe(_ =>
                        {
                            t = true;
                        });

                    s.AdvanceByMs(SecondsN * 1000);
                    mainThreadScheduler.AdvanceBy(1);
                    Assert.IsTrue(t);
                });
            }

            [Test]
            public void TestSchedulerExperiment_RunOnTest_ObserveOnTest()
            {
                new TestScheduler().With(s =>
                {
                    var t = false;
                    var obs = Observable.Return(Unit.Default)
                        .Delay(TimeSpan.FromSeconds(SecondsN), s)
                        .ObserveOn(s);
                    obs
                        .Subscribe(_ =>
                        {
                            t = true;
                        });

    //                s.AdvanceByMs(SecondsN * 1000);
    //                s.AdvanceBy(1);

                    s.AdvanceUntil(obs);

                    Assert.IsTrue(t);
                });
            }
        }


    }

At first, I thought that Scheduler.Immediate will do the trick, executing things after delay right on the spot, and boy, that's wrong. I found this article, which explained things rather nicely. I found also this post, explaining which operator uses which scheduler.

I know now, that when playing with time, I should use TestScheduler. Otherwise, don't change the schedulers.

I know now, that you DO NOT do anything async in contructor, instead you create a command called let's say Init that does that on activation and you can await it in a test (for example delayd collection creation based on constructor argument to allow smooth UI animations when the view is comples)

BUT, when I run those tests from above, I get that:

Test results

There are few things I do not understand.

1) Why with Scheduler.Immediate the tests take twice the time? I think I get why Take(1) does not make difference, but still...

2) When using TestSchduler, how do I determine how much to step forward?

I noticed that in test TestSchedulerExperiment_RunOnTest_ObserveOnTest I have to do additional AdvanceBy(1), becuase it's also the observer. So, when the chain is longer, has more observers, it's really hard to count them.

Is it common practice to do scheduler.AdvanceBy(10000000000000);?

I tried to create AdvanceUntil extension, but I know it sucks for many reasons (cold observables for example).

 public static void AdvanceUntil<TIgnore>(this TestScheduler s, IObservable<TIgnore> obs, double? advanceByMs = null)
        {
            var done = false;
            obs.Subscribe(_ => done = true, (ex) => done = true, () => done = true);

            while(!done)
                s.AdvanceByMs(advanceByMs ?? 100);
        }

Or maybe there is a "flush" method that I don't know?

Also, I learned to await stuff inside the TestScheduler.With:

    [Test]
    public Task TestSchedulerExperiment_await()
    {
        return new TestScheduler().With(async s =>
        {
            var v = false;

            var t = Observable.Return(true).Delay(TimeSpan.FromSeconds(SecondsN), s)
                .Take(1) // without hits the test never ends
                .ToTask();
            s.AdvanceByMs(SecondsN * 1000);
            v = await t;

            Assert.IsTrue(v);
        });

but I still need to know the time.

And why there has to be Take(1)?

Upvotes: 2

Views: 1102

Answers (1)

Colt Bauman
Colt Bauman

Reputation: 662

scheduler.Start() executes everything that has been scheduled, so you don't need that extension method.

I recommend not mixing async/await with Rx most of the time, especially for time-based functionality, which is basically all of your tests because of the Delay operator. Otherwise, you could potentially be waiting minutes for a single test to complete. So async/await serves no purpose in any of them.

For example, in a scenario like your TestSchedulerExperiment await test, the test scheduler along with a subscription is all you need. That test would simply become:

// Passing test    
[Test]
public void TestSchedulerExperiment()
{
    new TestScheduler().With(s =>
    {
        var v = false;

        Observable
            .Return(true)
            .Delay(TimeSpan.FromSeconds(1), s)
            .Subscribe(_ => v = true);

        s.Start();
        Console.WriteLine("Scheduler clock value: {0}", s.Clock);

        Assert.True(v);
    });
}

Why with Scheduler.Immediate the tests take twice the time?

If you really want to delve in and see what's happening under the hood I highly recommend this Spy extension by James and add timestamps.

var t = Observable
    .Return(Unit.Default).Spy("Return")
    .Delay(TimeSpan.FromSeconds(2), RxApp.MainThreadScheduler).Spy("Delay")
    .ToTask();
await t;

// Partial output
Return: OnNext(()) on Thread: 1, 23:22:41.2631845
Delay: OnNext(()) on Thread: 1, 23:22:43.2891836
Return: OnCompleted() on Thread: 1, 23:22:43.2921808
Delay: OnCompleted() on Thread: 1, 23:22:45.2958130

Return uses ImmediateScheduler and as you may know, RxApp.MainThreadScheduler = ImmediateScheduler in a unit test runner. Because this scheduler is synchronous Return and Delay notifications both have to wait on each other. Return can't fire its OnCompleted until Delay fires OnNext, and then Delay's OnCompleted notification delays for another 2 seconds.

Upvotes: 5

Related Questions