Reputation: 924
I need to debounce an input-stream.
At the first occurrence of state 1 I need to wait for 5 Seconds and verify if the laste state was also 1. Only than I have a stable signal.
(time) 0-1-2-3-4-5-6-7-8-9
(state) 0-0-0-0-0-1-0-1-0-1
(result) -> 1
Here is an example of a non-stable signal.
(time) 0-1-2-3-4-5-6-7-8-9
(state) 0-0-0-0-0-1-0-1-0-0
(result) -> 0
I tried using a buffer, but a buffer has fixed starting point and I need to wait for 5 seconds starting with my first event.
Upvotes: 0
Views: 1425
Reputation: 10783
Taking your requirements literally
At the first occurrence of state 1 I need to wait for 5 Seconds and verify if the laste state was also 1. Only than I have a stable signal.
I can come up with a few ways to solve this problem. To clarify my assumptions, you just want to push the last value produced 5 seconds after the first occurrence of a 1. This will result in a single value sequence producing either a 0 or a 1 (ie. regardless of any further values produced past 5 seconds from the source sequence)
Here I recreate you sequence with some jiggery-pokery.
var source = Observable.Timer(TimeSpan.Zero,TimeSpan.FromSeconds(1))
.Take(10)
.Select(i=>{if(i==5 || i==7 || i==9){return 1;}else{return 0;}}); //Should produce 1;
//.Select(i=>{if(i==5 || i==7 ){return 1;}else{return 0;}}); //Should produce 0;
All of the options below look to share the sequence. To share a sequence safely in Rx we Publish() and connect it. I use automatic connecting via the RefCount() operator.
var sharedSource = source.Publish().RefCount();
1) In this solution we take the first value of 1, and then buffer the selected the values of the sequence in to buffer sizes of 5 seconds. We only take the first of these buffers. Once we get this buffer, we get the last value and push that. If the buffer is empty, I assume we push a one as the last value was the '1' that started the buffer from running.
sharedSource.Where(state=>state==1)
.Take(1)
.SelectMany(_=>sharedSource.Buffer(TimeSpan.FromSeconds(5)).Take(1))
.Select(buffer=>
{
if(buffer.Any())
{
return buffer.Last();
}
else{
return 1;
}
})
.Dump();
2) In this solution I take the approach to only start listening once we get a valid value (1) and then take all values until a timer triggers the termination. From here we take the last value produced.
var fromFirstValid = sharedSource.SkipWhile(state=>state==0);
fromFirstValid
.TakeUntil(
fromFirstValid.Take(1)
.SelectMany(_=>Observable.Timer(TimeSpan.FromSeconds(5))))
.TakeLast(1)
.Dump();
3) In this solution I use the window operator to create a single window that opens when the first value of '1' happens and then closes when 5 seconds elapses. Again we just take the last value
sharedSource.Window(
sharedSource.Where(state=>state==1),
_=>Observable.Timer(TimeSpan.FromSeconds(5)))
.SelectMany(window=>window.TakeLast(1))
.Take(1)
.Dump();
So lots of different ways to skin-a-cat.
Upvotes: 4
Reputation: 16894
It sounds (at a glance) like you want Throttle
, not Buffer
, although some more information on your use cases would help pin that down - at any rate, here's how you might Throttle
your stream:
void Main()
{
var subject = new Subject<int>();
var source = subject.Publish().RefCount();
var query = source
// Start counting on a 1, wait 5 seconds, and take the last value
.Throttle(x => Observable.Timer(TimeSpan.FromSeconds(5)));
using(query.Subscribe(Console.WriteLine))
{
// This sequence should produce a one
subject.OnNext(1);
subject.OnNext(0);
subject.OnNext(1);
subject.OnNext(0);
subject.OnNext(1);
subject.OnNext(1);
Console.ReadLine();
// This sequence should produce a zero
subject.OnNext(0);
subject.OnNext(0);
subject.OnNext(0);
subject.OnNext(0);
subject.OnNext(1);
subject.OnNext(0);
Console.ReadLine();
}
}
Upvotes: 1