Louis Haußknecht
Louis Haußknecht

Reputation: 924

Rx debouncing inputs

I need to debounce an input-stream.

At the first occurrence of state 1 I need to wait for 5 Seconds and verify if the laste state was also 1. Only than I have a stable signal.

(time)  0-1-2-3-4-5-6-7-8-9
(state) 0-0-0-0-0-1-0-1-0-1
(result)                   -> 1

Here is an example of a non-stable signal.

(time)  0-1-2-3-4-5-6-7-8-9
(state) 0-0-0-0-0-1-0-1-0-0
(result)                   -> 0

I tried using a buffer, but a buffer has fixed starting point and I need to wait for 5 seconds starting with my first event.

Upvotes: 0

Views: 1425

Answers (2)

Lee Campbell
Lee Campbell

Reputation: 10783

Taking your requirements literally

At the first occurrence of state 1 I need to wait for 5 Seconds and verify if the laste state was also 1. Only than I have a stable signal.

I can come up with a few ways to solve this problem. To clarify my assumptions, you just want to push the last value produced 5 seconds after the first occurrence of a 1. This will result in a single value sequence producing either a 0 or a 1 (ie. regardless of any further values produced past 5 seconds from the source sequence)

Here I recreate you sequence with some jiggery-pokery.

var source = Observable.Timer(TimeSpan.Zero,TimeSpan.FromSeconds(1))
    .Take(10)
    .Select(i=>{if(i==5 || i==7 || i==9){return 1;}else{return 0;}}); //Should produce 1;
    //.Select(i=>{if(i==5 || i==7 ){return 1;}else{return 0;}});    //Should produce 0;

All of the options below look to share the sequence. To share a sequence safely in Rx we Publish() and connect it. I use automatic connecting via the RefCount() operator.

var sharedSource = source.Publish().RefCount();

1) In this solution we take the first value of 1, and then buffer the selected the values of the sequence in to buffer sizes of 5 seconds. We only take the first of these buffers. Once we get this buffer, we get the last value and push that. If the buffer is empty, I assume we push a one as the last value was the '1' that started the buffer from running.

sharedSource.Where(state=>state==1)
            .Take(1)
            .SelectMany(_=>sharedSource.Buffer(TimeSpan.FromSeconds(5)).Take(1))
            .Select(buffer=>
            {   
                if(buffer.Any())
                {
                    return buffer.Last();
                }
                else{
                    return 1;
                }
            })
            .Dump();

2) In this solution I take the approach to only start listening once we get a valid value (1) and then take all values until a timer triggers the termination. From here we take the last value produced.

var fromFirstValid = sharedSource.SkipWhile(state=>state==0);
fromFirstValid 
    .TakeUntil(
        fromFirstValid.Take(1)
                    .SelectMany(_=>Observable.Timer(TimeSpan.FromSeconds(5))))
    .TakeLast(1)
    .Dump();

3) In this solution I use the window operator to create a single window that opens when the first value of '1' happens and then closes when 5 seconds elapses. Again we just take the last value

sharedSource.Window(
                sharedSource.Where(state=>state==1),
                _=>Observable.Timer(TimeSpan.FromSeconds(5)))
            .SelectMany(window=>window.TakeLast(1))
            .Take(1)
            .Dump();

So lots of different ways to skin-a-cat.

Upvotes: 4

JerKimball
JerKimball

Reputation: 16894

It sounds (at a glance) like you want Throttle, not Buffer, although some more information on your use cases would help pin that down - at any rate, here's how you might Throttle your stream:

void Main()
{
    var subject = new Subject<int>();
    var source = subject.Publish().RefCount();

    var query = source
        // Start counting on a 1, wait 5 seconds, and take the last value
        .Throttle(x => Observable.Timer(TimeSpan.FromSeconds(5)));

    using(query.Subscribe(Console.WriteLine))
    {
        // This sequence should produce a one
        subject.OnNext(1);
        subject.OnNext(0);
        subject.OnNext(1);
        subject.OnNext(0);
        subject.OnNext(1);
        subject.OnNext(1);
        Console.ReadLine();
        // This sequence should produce a zero
        subject.OnNext(0);
        subject.OnNext(0);
        subject.OnNext(0);
        subject.OnNext(0);
        subject.OnNext(1);
        subject.OnNext(0);
        Console.ReadLine();
    }
}

Upvotes: 1

Related Questions