wuLiao
wuLiao

Reputation: 5

Subscription to an observable not running on the expected thread/dispatcher

In a recent practice to learn ReactiveUI, I wrote a simple timer module to create a countdown UI feature. To keep the UI responsive, I introduced some multi-tasking code which is provided below. However the code is not functioning as expected, more specifically not running on the expect thread/scheduler.

I created The timeoutObservable to produce a series of TimeSpan objects, then I made a subscription to it with a simple lambda expression which changes a property bound to a UI textblock control. I used SubscribeOn(RxApp.MainThreadScheduler) to ensure the subscription code run on the main/dispatcher thread.

WndMainVm.cs

    public class WndMainVm : ReactiveObject
    {
        public WndMainVm()
        {
            ButtonDisplayString = $"Play! (Timeout: {GameTimeout.TotalSeconds}s)";
            StartGameCommand = ReactiveCommand.CreateFromTask(async _ =>
            {
                IsGameStarted = true;
                TimeLeft = GameTimeout;
                var lastRecordTime = DateTime.Now;
                await GameControlInteraction.StartGame.Handle(Unit.Default);
                var timeoutObservable = Observable
                    .Interval(UpdateInterval)
                    .Select(l =>
                    {
                        var newLastRecordTime = DateTime.Now;
                        var newTimeLeft = TimeLeft - (newLastRecordTime - lastRecordTime);
                        lastRecordTime = newLastRecordTime;
                        return newTimeLeft;
                    })
                    .Merge(Observable
                        .Timer(GameTimeout)
                        .Select(l => TimeSpan.Zero))
                    .TakeUntil(ts => ts == TimeSpan.Zero);
                timeoutObservable.
                    SubscribeOn(RxApp.MainThreadScheduler).
                    Subscribe(ts => 
                        TimeLeft = ts);
                await timeoutObservable;
                await GameControlInteraction.StopGame.Handle(Unit.Default);
                IsGameStarted = false;
            }, this.WhenAnyValue(x => x.IsGameStarted).Select(v => !v));

            this.WhenAnyValue(x => x.TimeLeft)
                .Select(v => $"Time left: {v.TotalMilliseconds}ms")
                .ToProperty(this, x => x.TimeoutDisplayString, out _timeoutDisplayString, scheduler: RxApp.MainThreadScheduler);
        }

        private readonly ObservableAsPropertyHelper<string> _timeoutDisplayString;

        public TimeSpan GameTimeout { get; } = TimeSpan.FromSeconds(10);

        public TimeSpan UpdateInterval { get; } = TimeSpan.FromMilliseconds(10);

        [Reactive]
        public bool IsGameStarted { get; set; }

        [Reactive]
        public TimeSpan TimeLeft { get; set; }

        [Reactive] 
        public string ButtonDisplayString { get; set; }

        public string TimeoutDisplayString => _timeoutDisplayString.Value;

        public ReactiveCommand<Unit, Unit> StartGameCommand { get; }
    }

WndMain.cs

    public partial class WndMain : ReactiveWindow<WndMainVm>
    {
        public WndMain()
        {
            InitializeComponent();
            ViewModel = new WndMainVm();
            this.WhenActivated(d =>
            {
                this.OneWayBind(ViewModel, x => x.ButtonDisplayString, x => x.BtnPlayStop.Content).DisposeWith(d);
                this.OneWayBind(ViewModel, x => x.TimeoutDisplayString, x => x.TbkTimeDisplay.Text).DisposeWith(d); \\CountDownDisplay

                this.BindCommand(ViewModel, x => x.StartGameCommand, x => x.BtnPlayStop).DisposeWith(d);
            });
        }
    }

However when I was testing the code, I found out that the subscription code is always running on a thread from thread pool, causing a TargetInvocationException. I know this would happen when you try to change a property of a control from a thread other than main/dispatcher thread, so I wonder if there is something wrong with my code preventing it from executing on the right thread. Currently I tried to bypass this problem by creating a dependent property TimeoutDisplayString and it worked fine, but this problem still bewilders me and I really want to find out why.

I'm not very familiar with the async/await keyword, so my guess is that I didn't use them correctly, can anyone kindly have a look at it, point out my error, or provide a better countdown solution?

Upvotes: 0

Views: 862

Answers (1)

Rodney Littles
Rodney Littles

Reputation: 564

Generally when we think we should use SubscribeOn we should actually be using ObserveOn.

The SubscribeOn operator is similar, but it instructs the Observable to itself operate on the specified Scheduler, as well as notifying its observers on that Scheduler.

http://reactivex.io/documentation/operators/observeon.html

Upvotes: 2

Related Questions