Flack
Flack

Reputation: 5899

Rx to trigger an action after a certain amount of time

I have a class that has a couple of bool properties and is subscribed to an observable that supplies objects that contain various values (at an indeterminate pace). For example:

bool IsActive {get; private set;}
bool IsBroken {get; private set;}
bool Status {get; private set;}
...
valueStream.GetValues().Subscribe(UpdateValues);

UpdateValues does some work based on the object passed in. There is one value in particular that I am interested in using for some specific logic though. Let's call it obj.SpecialValue:

private void UpdateValues(MyObject obj)
{
  ...
  Status = obj.SpecialValue;
  ...
}

Now, if IsActive is set to true and Status is false, I want to wait three seconds before setting IsBroken to true, to give the stream a chance to return an obj.SpecialValue of true in that time. If within three seconds it does return true, I just do nothing, otherwise set IsBroken to true. When either Status or IsActive is updated, check again.

At first I had:

Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(3)).Subscribe(SetIsBroken);

private void SetIsBroken()
{
  IsBroken = IsActive && !Status;
}

but that does more checking than it needs to. It only really needs to check when the stream updates or IsActive is changed.

Any tips on how to do this the right way?

Upvotes: 3

Views: 1792

Answers (1)

James World
James World

Reputation: 29806

Using BehavourSubject<T> to back properties

A useful idea for this problem is to back your properties with BehaviorSubject<bool> types. These usefully serve the dual purpose of active both as a property and a stream of values of that property.

You can subscribe to them as observables, but also access their current value through the Value property. You change them by sending a new value via OnNext.

For example, we could do this:

private BehaviorSubject<bool> _isActive;

public bool IsActive
{
    get { return _isActive.Value; }
    set { _isActive.OnNext(value); }
}

With this in place across all your properties, it becomes a fairly simply exercise to watch the properties for the complex condition you state. Assuming _status and _isBroken are similarly implemented backing subjects we can set up a subscription like this:

Observable.CombineLatest(_isActive,
                         _status,
                        (a,s) => a & !s).DistinctUntilChanged()
          .Where(p => p)
          .SelectMany(_ => Observable.Timer(TimeSpan.FromSeconds(3), scheduler)
                                     .TakeUntil(_status.Where(st => st)))
          .Subscribe(_ => _isBroken.OnNext(true));  

The part line uses CombineLatest and subscribes to the _isActive and _status streams. It emits whenever either of these change - and the result function sets a true value precisely when _isActive is true and _status is false. The DistinctUntilChanged() prevents the setting of _isActive and _status to the values they already have from starting a new timer.

Then we use Where to filter for this condition only.

The SelectMany is now going to take true values and project each one into a stream that emits after 3 seconds, using Timer - but we use TakeUntil to squash this value in the event that _status becomes true. SelectMany also flattens the stream of streams back down to a single boolean stream.

Not sure here - you didn't mention it, but you may want to think about whether _isActive going false should also terminate the timer. If this is the case, you can use Merge to combine a watch for this and _status in the TakeUntil.

We can subscribe to this whole thing now to set _isBroken true should this query ever fire, indicating the timer expired.

Note the scheduler argument to Timer - this exists so we can pass in a test scheduler.

I'm not sure if I've captured all your logic correctly - but if not hopefully you can see how to amend as necessary.

Here is the complete example. Use nuget package rx-testing and this will run in LINQPad as written:

void Main()
{
    var tests = new Tests();
    tests.Test();
}

public class Foo
{
    private BehaviorSubject<bool> _isActive;
    private BehaviorSubject<bool> _isBroken;
    private BehaviorSubject<bool> _status;

    public bool IsActive
    {
        get { return _isActive.Value; }
        set { _isActive.OnNext(value); }
    }

    public bool IsBroken { get { return _isBroken.Value; } }
    public bool Status { get { return _status.Value; } }

    public Foo(IObservable<MyObject> valueStream, IScheduler scheduler)
    {
        _isActive = new BehaviorSubject<bool>(false);
        _isBroken = new BehaviorSubject<bool>(false);
        _status = new BehaviorSubject<bool>(false);

        // for debugging purposes
        _isActive.Subscribe(a => Console.WriteLine(
             "Time: " + scheduler.Now.TimeOfDay + " IsActive: " + a));
        _isBroken.Subscribe(a => Console.WriteLine(
             "Time: " + scheduler.Now.TimeOfDay + " IsBroken: " + a));
        _status.Subscribe(a => Console.WriteLine(
              "Time: " + scheduler.Now.TimeOfDay + " Status: " + a));

        valueStream.Subscribe(UpdateValues);

        Observable.CombineLatest(
                    _isActive,
                    _status,
                    (a,s) => a & !s).DistinctUntilChanged()
                .Where(p => p)
                .SelectMany(_ => Observable.Timer(TimeSpan.FromSeconds(3),
                                                  scheduler)
                                           .TakeUntil(_status.Where(st => st)))
                .Subscribe(_ => _isBroken.OnNext(true));            
    }

    private void UpdateValues(MyObject obj)
    {
        _status.OnNext(obj.SpecialValue);
    }   
}   

public class MyObject
{
    public MyObject(bool specialValue)
    {
        SpecialValue = specialValue;
    }

    public bool SpecialValue { get; set; }
}

public class Tests : ReactiveTest
{
    public void Test()
    {
        var testScheduler = new TestScheduler();

        var statusStream = testScheduler.CreateColdObservable<bool>(
            OnNext(TimeSpan.FromSeconds(1).Ticks, false),
            OnNext(TimeSpan.FromSeconds(3).Ticks, true),
            OnNext(TimeSpan.FromSeconds(5).Ticks, false));

        var activeStream = testScheduler.CreateColdObservable<bool>(
            OnNext(TimeSpan.FromSeconds(1).Ticks, false),
            OnNext(TimeSpan.FromSeconds(6).Ticks, true));           

        var foo = new Foo(statusStream.Select(b => new MyObject(b)), testScheduler);

        activeStream.Subscribe(b => foo.IsActive = b);       

        testScheduler.Start();               
    }        
}

Response to comment

If you want isActive false to set isBroken false, then I think this adds up to now saying the following:

isActive isStatus Action
T        F        Set Broken True after 3 seconds unless any other result occurs
T        T        Set Broken False immediately if not already false, cancel timer
F        F        Set Broken False immediately if not already false, cancel timer
F        T        Set Broken False immediately if not already false, cancel timer

In that case, use the following query:

Observable.CombineLatest(
            _isActive,
            _status,
            (a,s) => a & !s).DistinctUntilChanged()                    
        .Select(p => p ? Observable.Timer(TimeSpan.FromSeconds(3),
                                               scheduler)
                                   .Select(_ => true)
                       : Observable.Return(false))
        .Switch()
        .DistinctUntilChanged()
        .Subscribe(res => _isBroken.OnNext(res));

Note the changes:

  • SelectMany is now a Select that turns each event into either
    • A Timer that returns true after 3 seconds
    • Or an immediate false
  • The result of the Select is a stream of bool streams: IObservable<IObservable<bool>>. We want any new stream appearing to cancel any previous stream. This is what the Switch will do - flattening the result in the process.
  • We now apply a second DistinctUntilChanged() since a cancelled timer could cause two false values to appear in the stream consecutively
  • Finally we assign the emerging bool value to isBroken.

Upvotes: 1

Related Questions