yossharel
yossharel

Reputation: 1929

RX - How to subscribe for condition state, but only when this state doesn't change for x period of time?

Do you know how to subscribe for condition state that stay for x time?

For example if I have BehaviorSubject<int> that represent int value between 0 - 100, and this value is changing over time, I want to subscribe when this value is under 50 for 10 seconds continuously.

if the value change back to above 50 for a moment and then down 50 again, I want to count again for 10 seconds. How can I do this?

Many thanks!

Upvotes: 5

Views: 3644

Answers (3)

yossharel
yossharel

Reputation: 1929

I hope I don't miss something but I think the easiest way is this:

BehaviorSubject<int> value = new BehaviorSubject<int>(0);

value.Select(v => v < 50).DistinctUntilChanged().Throttle(TimeSpan.FromSeconds(10))
.Where(x => x).Subscribe(b => DoSomething());

the answer of Enigmativity gave me the start how to get handle this . Thanks!

Upvotes: 4

yamen
yamen

Reputation: 15618

@GideonEngelberth makes a great point that my original solution (below) doesn't work. I've stolen some ideas from @Enigmativity, but where he stops short of actually using the 'switching stream' to retrieve values from the source, I have gone the extra step.

The result is this extension function:

public static IObservable<T> WhereTimed<T>(this IObservable<T> source, Func<T,bool> pred, TimeSpan minTime)
{
    var published = source.Publish().RefCount(); // we make multiple subscriptions, let's share them

    var switches = published.Select(pred).DistinctUntilChanged();

    return published.Window(switches.Where(x => x), _ => switches.Where(x => !x))
                    .SelectMany(xs => xs.SkipUntil(Observable.Timer(minTime)).TakeWhile(pred));
}

This uses the Select(pred).DistinctUntilChanged() to give us a stream that fires once whenever we go under the threshold (true) and once whenever we go above the threshold (false).

We then leverage this stream to create a window over the observables between true and false. We skip the first N seconds of that new stream and take while we're still below the threshold.


Original: (not working)

This problem breaks down to saying:

When the stream is below 50, skip the first 10 seconds of values then return the stream. Do this until the stream is above 50. Then start all over again.

The beauty of RX is that this can be translated as is to the relevant functions. Here's a simple extension method that does exactly this:

public static IObservable<T> WhereTimed<T>(this IObservable<T> source, Func<T,bool> pred, TimeSpan minTime)
{
    var published = source.Publish().RefCount(); // we make multiple subscriptions, let's share them

    var openers = published.Where(pred);            // start taking at this point
    var closers = published.Where(z => !pred(z));   // stop taking at this point

    return openers.SkipUntil(Observable.Timer(minTime))
                  .TakeUntil(closers)
                  .Repeat();
}

Here is a test to show it working:

var ws = Observable.Repeat(1,10);
var xs = Observable.Repeat(2,10);
var ys = Observable.Repeat(100,10);
var zs = ws.Concat(xs.Delay(TimeSpan.FromSeconds(2)).Concat(ys)).Repeat(5);

zs.WhereTimed(z => z <= 50, TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);

Here the zs stream is:

(1 x 10) <2 second pause> (2 x 10) (100 x 10) [repeat 5 times]

Based on our rules, this should result in the first 2 second pause triggering the stream, and showing the 2 value 10 times. However, that should immediately reset when the 100 is hit, then when the 1 gets played, there has been insufficient pause for any 1 to show. This then repeats a few times, and only the number 2 is ever shown.

Upvotes: 1

Enigmativity
Enigmativity

Reputation: 117175

Here's a fairly neat observable query that seemed to work properly for me:

var below50longerthan10seconds =
    subject
        .Select(x => x < 50)
        .DistinctUntilChanged()
        .Select(x =>
            Observable.Delay(
                Observable.Return(x),
                TimeSpan.FromSeconds(10.0)))
        .Switch()
        .Where(x => x)
        .Select(x => Unit.Default);

Here's the breakdown.

Change the values from 0 to 100 to true when less than 50 and false otherwise:

        .Select(x => x < 50)

Only keep the actual changes between true & false:

        .DistinctUntilChanged()

Project the value into a new observable that is delayed for 10 seconds:

        .Select(x =>
            Observable.Delay(
                Observable.Return(x),
                TimeSpan.FromSeconds(10.0)))

Now perform a .Switch() - if a new x comes before the delayed observable then it is ignored because a new delayed observable is coming:

        .Switch()

Select only the true values meaning when the original stream was below 50:

        .Where(x => x)

Select Unit.Default simply because it is weird to have a stream of true values without any false values:

        .Select(x => Unit.Default);

So now you have an IObservable<Unit> that releases a new value when the original stream produces a value less then 50 and does not produce a value greater than or equal to 50 within 10 seconds.

Is that what you wanted?

Upvotes: 7

Related Questions