resp78
resp78

Reputation: 1534

Controlling (start/stop) IObservable through ToggleSwitch using ReactiveUI

I am trying to hook a stream (IObservable) to be controlled through ToggleSwitch in a UWP project. The expectation is that I start the streaming when the switch is in On state and stop when it is in Off state.

So the thought is to 1. Create two commands, one to start the stream and another to stop the stream. 2. Create two Observables that monitors the switch state and InvokeCommand when the condition is right.

ViewModel

public class MainPageViewModel : ViewModelBase
{
    public ReactiveCommand<Unit, (long, float)> StreamCommand { get; }
    public ReactiveCommand<Unit, Unit> StopCommand { get; }
    public IObservable<(long, float)> FlowStream { get; set; }

    private bool _isStreamOn;

    public bool IsStreamOn
    {
        get => _isStreamOn;
        set => this.RaiseAndSetIfChanged(ref _isStreamOn, value);
    }

    public MainPageViewModel()
    {
        var stream = GetStream();

        var canSwitchOn = this.WhenAnyValue(x => x.IsStreamOn);
        var canSwitchOff = this.WhenAnyValue(x => x.IsStreamOn, isOn => isOn != true);

        FlowStream = StreamCommand = ReactiveCommand.CreateFromObservable(
            () =>
                {
                    stream.Start();
                    return Observable.FromEventPattern<StreamDataEventArgs<(long, INumeric, INumeric, INumeric)>>(
                            h => stream.DataAvailable += h,
                            h => stream.DataAvailable -= h)
                        .SelectMany(e => e.EventArgs.Data)
                        .Select(item => item));
                }, canSwitchOn);

        StopCommand = ReactiveCommand.Create(
            () =>
            {
                stream.Stop();
                IsStreamOn = false;
            }, canSwitchOff);

        canSwitchOff.InvokeCommand(StopCommand);
        canSwitchOn.InvokeCommand(StreamCommand);
    }

}

View

public sealed partial class MainPage : Page, IViewFor<MainPageViewModel>
{
    public MainPage()
    {
        InitializeComponent();
        NavigationCacheMode = Windows.UI.Xaml.Navigation.NavigationCacheMode.Enabled;
        ViewModel = new MainPageViewModel();

        this.WhenActivated(subscription =>
        {
            subscription(this.OneWayBind(this.ViewModel,
                vm => vm.StreamCommand,
                v => v.chart.SeriesCollection[0].Stream)); // Chart take care of displaying data

            subscription(this.Bind(this.ViewModel,
                vm => vm.IsStreamOn,
                v => v.streamToggle.IsOn));
        });
    }

    object IViewFor.ViewModel
    {
        get { return ViewModel; }
        set { ViewModel = (MainPageViewModel)value; }
    }

    public MainPageViewModel ViewModel
    {
        get { return (MainPageViewModel)GetValue(ViewModelProperty); }
        set { SetValue(ViewModelProperty, value); }
    }

    public static readonly DependencyProperty ViewModelProperty =
        DependencyProperty.Register("ViewModel", typeof(MainPageViewModel), typeof(MainPage), null);
}

However, the InvokeCommand fails, as it requires the ReactiveCommands to take the bool, instead of Unit. Any idea how I can invoke a command when certain conditions are met?

Upvotes: 3

Views: 456

Answers (2)

Enigmativity
Enigmativity

Reputation: 117064

If you want to turn a stream (IObservable<(long, float)> FlowStream) on and off based on a IObservable<bool> IsStreamOn observable then you can do this:

IObservable<(long, float)> outputStream =
    IsStreamOn
        .Select(flag => flag ? FlowStream : Observable.Never<(long, float)>())
        .Switch();

So each time IsStreamOn produces a true you start getting values from FlowStream, otherwise the values stop.

This assumes that FlowStream is hot. If not, do this:

IObservable<(long, float)> outputStream =
    FlowStream
        .Publish(fs =>
            IsStreamOn
                .Select(flag => flag ? fs : Observable.Never<(long, float)>())
                .Switch());

Here's a simple test:

void Main()
{
    IObservable<long> outputStream =
        FlowStream
            .Publish(fs =>
                IsStreamOn
                    .Select(flag => flag ? fs : Observable.Never<long>())
                    .Switch());

    using (outputStream.Subscribe(Console.WriteLine))
    {
        IsStreamOn.OnNext(true);
        Thread.Sleep(TimeSpan.FromSeconds(2.5));
        IsStreamOn.OnNext(false);
        Thread.Sleep(TimeSpan.FromSeconds(3.0));
        IsStreamOn.OnNext(true);
        Thread.Sleep(TimeSpan.FromSeconds(3.0));
    }

}

IObservable<long> FlowStream = Observable.Interval(TimeSpan.FromSeconds(1.0));
Subject<bool> IsStreamOn = new Subject<bool>();

This produces:

0
1
5
6
7

Given the comments re actually calling .Start() and .Stop() then try something like this:

IObservable<(long, float)> outputStream =
    Observable
        .Create<(long, float)>(o =>
        {
            var stream = GetStream();
            return 
                FlowStream
                    .Publish(fs =>
                        IsStreamOn
                            .Do(flag => { if (flag) stream.Start(); else stream.Stop(); })
                            .Select(flag => flag ? fs : Observable.Never<(long, float)>())
                            .Switch())
                    .Subscribe(o);
        });

Upvotes: 3

Glenn Watson
Glenn Watson

Reputation: 2888

In these scenarios with your observables I tend to do

var canSwitchOn = this.WhenAnyValue(x => x.IsStreamOn).Select(_ => Unit.Default);

That will allow you not to have the bool passed along to the command.

oh also you may want a where() clause in this cause if you want to trigger a command in the right condition.

eg.

var switchOn = this.WhenAnyValue(x => x.IsStreamOn).Where(x => x).Select(_ => Unit.Default);
var switchOff = this.WhenAnyValue(x => x.IsStreamOn).Where(x => !x).Select(_ => Unit.Default);

Upvotes: 1

Related Questions