Anton Zhuraulevich
Anton Zhuraulevich

Reputation: 21

RX: how to pass the latest value from observable into ReactiveCommand

I want to get the latest value of the IdStream and use it in command Execute action.

public IObservable<Option<Guid>> IdStream { get; }

IdStream = documentStream.OfType<DocumentOpened().Select(x => x.Document.Id.Some())
.Merge(documentStream.OfType<DocumentClosed().Select(x => Option<Guid>.None()));

var saveCommand = ReactiveCommand.Create(() => Save(id), CanExecute);

I had tried to use the answer https://stackoverflow.com/a/31168822/7779560 and got something like this:

var saveCommand = ReactiveCommand.Create(() => { }, CanExecute);
saveCommand.WithLatestFrom(IdStream, (_, id) => id)
            .Subscribe(id => Save(id));

And it works, but I can't use IsExecuting and ThrownExceptions command's functionality in this case (they act only for empty action which I passed as Execute during command creation).

UPD:

Execution order:

  1. IdStream creating
  2. Command creating
  3. documentStream processes DocumentOpened event (get some Id value - I checked it)
  4. saveCommand execution

How can I achieve it?

UPD 2: I need also to await methods inside command body (SaveAsync, for example).

Upvotes: 2

Views: 1750

Answers (3)

Kris McGinnes
Kris McGinnes

Reputation: 392

You can store the most recent value of your stream in an ObservableAsPropertyHelper<> property and use it in your command.

Your class level properties would look like this:

public IObservable<Option<Guid>> IdStream { get; }

private ObservableAsPropertyHelper<Option<Guid>> _currentId;
public Option<Guid> CurrentId => _currentId.Value;

And your constructor would wire things up like this:

IdStream.ToProperty(this, x => x.CurrentId, out _currentId);
var saveCommand = ReactiveCommand.Create(() => Save(CurrentId), CanExecute);

You might want to provide a default value for the CurrentId property. You can do that in the ToProperty() call.

Upvotes: 0

bradgonesurfing
bradgonesurfing

Reputation: 32162

You want to use Observable.Sample

    [Fact]
    public void ExecuteUsingLastProducedValue()
    {
        Subject<string> producer = new Subject<string>();
        IObservable<bool> CanExecute = Observable.Return(true);
        IObservable<string> IdStream = producer;
        string SaveCalledWith = String.Empty;

        Func<string, Task> SaveAsync = (id) =>
        {
            SaveCalledWith = id;
            return Task.Delay(0);
        };

        // IdStream creating
        var connectedIdStream =
            IdStream
                .Replay(1);

        connectedIdStream
            .Connect();

        //Command creating
        var command = ReactiveCommand.Create(() => { } , CanExecute);
        connectedIdStream.Sample( command )
                         .Subscribe( id => SaveAsync(id) );

        //documentStream processes DocumentOpened event (get some Id value - I checked it)
        producer.OnNext("something random");
        producer.OnNext("working");

        //At this point Save still hasen't been called so just verifiyng it's still empty
        SaveCalledWith.Should().Be( String.Empty );

        //trigger execution of command
        command.Execute(Unit.Default).Subscribe();

        //Verified Saved Called With is called
        SaveCalledWith.Should().Be( "working");
    }

( I rewrote with XUnit because I had that on hand )

Here is a slightly simplified and extended test case with some of your code replaced by what I would recommend.

    [Fact]
    public void ExecuteUsingLastProducedValue()
    {
        var producer = new Subject<string>();
        var canExecute = Observable.Return(true);
        var saveCalledWith = String.Empty;

        void Save(string id) => saveCalledWith = id;

        var rcommand = ReactiveCommand.Create(() => { } , canExecute);

        // When cast to ICommand ReactiveCommand has a
        // more convienient Execute method. No need
        // to Subscribe.
        var command = (ICommand) rcommand; 


        producer
            .Sample( rcommand )
            .Subscribe( Save );

        //documentStream processes DocumentOpened event (get some Id value - I checked it)
        producer.OnNext("something random");
        producer.OnNext("working");

        //At this point Save still hasen't been called so just verifiyng it's still empty
        saveCalledWith.Should().Be( String.Empty );

        //trigger execution of command
        command.Execute( Unit.Default );

        //Verified Saved Called With is called
        saveCalledWith.Should().Be( "working");

        command.Execute( Unit.Default );

        saveCalledWith.Should().Be( "working");

        producer.OnNext("cat");
        saveCalledWith.Should().Be( "working");
        command.Execute( Unit.Default );
        saveCalledWith.Should().Be( "cat");
        producer.OnNext("dog");
        saveCalledWith.Should().Be( "cat");
        command.Execute( Unit.Default );
        saveCalledWith.Should().Be( "dog");
    }

Upvotes: 0

Shane Neuville
Shane Neuville

Reputation: 2177

Does this work for you? Replay will retain the latest value published. When the command is executed it will grab the latest value, the Take(1) unsubscribes after that because you only need one value, and then it pushes that to the Save;

    [Test]
    public void ExecuteUsingLastProducedValue()
    {
        Subject<string> producer = new Subject<string>();
        IObservable<bool> CanExecute = Observable.Return(true);
        IObservable<string> IdStream = producer;
        string SaveCalledWith = String.Empty;

        Func<string, Task> SaveAsync = (id) =>
        {
            SaveCalledWith = id;
            return Task.Delay(0);
        };

        // IdStream creating
         var connectedIdStream =
            IdStream
            .Replay(1);

        connectedIdStream
            .Connect();

        //Command creating
        var command = ReactiveCommand.CreateFromObservable(() =>
        {
            return connectedIdStream
                .Take(1)
                .Do(async id =>
                {
                    await SaveAsync(id);
                });
        }
        , CanExecute);


        //Alternate way
        //Command creating
        //var command = ReactiveCommand.CreateFromObservable(() =>
        //{
        //    return connectedIdStream
        //        .Take(1)
        //        .SelectMany(id => SaveAsync(id).ToObservable());
        //}
        //, CanExecute);


        //documentStream processes DocumentOpened event (get some Id value - I checked it)
        producer.OnNext("something random");
        producer.OnNext("working");

        //At this point Save still hasen't been called so just verifiyng it's still empty
        Assert.AreEqual(String.Empty, SaveCalledWith);

        //trigger execution of command
        command.Execute(Unit.Default).Subscribe();

        //Verified Saved Called With is called
        Assert.AreEqual(SaveCalledWith, "working");
    }

Upvotes: 2

Related Questions