Reputation: 1929
Do you know how to subscribe for condition state that stay for x time?
For example if I have BehaviorSubject<int>
that represent int value between 0 - 100, and this value is changing over time, I want to subscribe when this value is under 50 for 10 seconds continuously.
if the value change back to above 50 for a moment and then down 50 again, I want to count again for 10 seconds. How can I do this?
Many thanks!
Upvotes: 5
Views: 3644
Reputation: 1929
I hope I don't miss something but I think the easiest way is this:
BehaviorSubject<int> value = new BehaviorSubject<int>(0);
value.Select(v => v < 50).DistinctUntilChanged().Throttle(TimeSpan.FromSeconds(10))
.Where(x => x).Subscribe(b => DoSomething());
the answer of Enigmativity gave me the start how to get handle this . Thanks!
Upvotes: 4
Reputation: 15618
@GideonEngelberth makes a great point that my original solution (below) doesn't work. I've stolen some ideas from @Enigmativity, but where he stops short of actually using the 'switching stream' to retrieve values from the source, I have gone the extra step.
The result is this extension function:
public static IObservable<T> WhereTimed<T>(this IObservable<T> source, Func<T,bool> pred, TimeSpan minTime)
{
var published = source.Publish().RefCount(); // we make multiple subscriptions, let's share them
var switches = published.Select(pred).DistinctUntilChanged();
return published.Window(switches.Where(x => x), _ => switches.Where(x => !x))
.SelectMany(xs => xs.SkipUntil(Observable.Timer(minTime)).TakeWhile(pred));
}
This uses the Select(pred).DistinctUntilChanged()
to give us a stream that fires once whenever we go under the threshold (true
) and once whenever we go above the threshold (false
).
We then leverage this stream to create a window over the observables between true
and false
. We skip the first N seconds of that new stream and take while we're still below the threshold.
Original: (not working)
This problem breaks down to saying:
When the stream is below 50, skip the first 10 seconds of values then return the stream. Do this until the stream is above 50. Then start all over again.
The beauty of RX is that this can be translated as is to the relevant functions. Here's a simple extension method that does exactly this:
public static IObservable<T> WhereTimed<T>(this IObservable<T> source, Func<T,bool> pred, TimeSpan minTime)
{
var published = source.Publish().RefCount(); // we make multiple subscriptions, let's share them
var openers = published.Where(pred); // start taking at this point
var closers = published.Where(z => !pred(z)); // stop taking at this point
return openers.SkipUntil(Observable.Timer(minTime))
.TakeUntil(closers)
.Repeat();
}
Here is a test to show it working:
var ws = Observable.Repeat(1,10);
var xs = Observable.Repeat(2,10);
var ys = Observable.Repeat(100,10);
var zs = ws.Concat(xs.Delay(TimeSpan.FromSeconds(2)).Concat(ys)).Repeat(5);
zs.WhereTimed(z => z <= 50, TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);
Here the zs
stream is:
(1 x 10) <2 second pause> (2 x 10) (100 x 10) [repeat 5 times]
Based on our rules, this should result in the first 2 second pause triggering the stream, and showing the 2
value 10 times. However, that should immediately reset when the 100
is hit, then when the 1
gets played, there has been insufficient pause for any 1
to show. This then repeats a few times, and only the number 2
is ever shown.
Upvotes: 1
Reputation: 117175
Here's a fairly neat observable query that seemed to work properly for me:
var below50longerthan10seconds =
subject
.Select(x => x < 50)
.DistinctUntilChanged()
.Select(x =>
Observable.Delay(
Observable.Return(x),
TimeSpan.FromSeconds(10.0)))
.Switch()
.Where(x => x)
.Select(x => Unit.Default);
Here's the breakdown.
Change the values from 0 to 100 to true
when less than 50 and false
otherwise:
.Select(x => x < 50)
Only keep the actual changes between true
& false
:
.DistinctUntilChanged()
Project the value into a new observable that is delayed for 10 seconds:
.Select(x =>
Observable.Delay(
Observable.Return(x),
TimeSpan.FromSeconds(10.0)))
Now perform a .Switch()
- if a new x
comes before the delayed observable then it is ignored because a new delayed observable is coming:
.Switch()
Select only the true
values meaning when the original stream was below 50:
.Where(x => x)
Select Unit.Default
simply because it is weird to have a stream of true
values without any false
values:
.Select(x => Unit.Default);
So now you have an IObservable<Unit>
that releases a new value when the original stream produces a value less then 50 and does not produce a value greater than or equal to 50 within 10 seconds.
Is that what you wanted?
Upvotes: 7