Kobi Hari
Kobi Hari

Reputation: 1248

Debounce Until in Rx.Net

I have 2 streams of events: 1. Stream of mouse drag and drop events (drag start ... drag end ... drag start ... drag end) 2. Stream of key press events ('a' ... 'b' .... 'c' .... 'd')

I need to combine into a stream that only contains events from the second streams (so only key presses) but it needs to filter out all key presses that occur between a drag start and drag end, except for the last one.

So if the sources are like this:

... Start ............... End .............. Start .............. End

and

...........'a'...'b'...'c'.......'d'...'e'..........'f'....'g'.......

The result should be like this:

...........................'c'...'d'...'e'..........................'g'

Is something like this possible using Rx.net in C#?

Upvotes: 3

Views: 1748

Answers (1)

Shlomo
Shlomo

Reputation: 14350

The answer is yes. Answer first, then explanation:

public static class X 
{

    public static IObservable<T> GatedDebounce<T>(this IObservable<T> source, IObservable<bool> gating)
    {
        var finalStream = gating
            .StartWith(false)
            .DistinctUntilChanged()
            .Publish(_gating => source.Publish(_source => Observable.Merge(
                _source
                    .Window(_gating.Where(b => b), _ => _gating.Where(b => !b))
                    .SelectMany(o => o.LastAsync()),
                _source
                    .Window(_gating.Where(b => !b), _ => _gating.Where(b => b))
                    .Merge()
            )));
        return finalStream;
    }

}

Then, given an IObservable<T> representing your values, and an IObservable<bool> representing where drags start and stop (true meaning drag-start, and false meaning drag-end), you would call it like this:

var throttledStream= valueStream.GatedDebounce(gateStream);

Explanation:

To understand it better, let's throw out the Publish calls, and break it up into pieces:

Piece 1,

source
    .Window(gating.Where(b => b), _ => gating.Where(b => !b))
    .SelectMany(o => o.LastAsync())

This Window function means call means we start a sub-set observable (or window) whenever gating emits true, and end that window whenever gating emits false. From that window, we select the last item, if it exists. This will only be emitted when the window closes.

Piece 2,

source
    .Window(gating.Where(b => !b), _ => gating.Where(b => b))
    .Merge() //Equivalent to .SelectMany(o => o) if you prefer

This Window function does the opposite: Start a window whenever gating emits false, and end it whenever gating emits true. From that window we emit everything when it arrives.

Put these two together with Merge and you get 90% of the way to your solution. The rest:

  • The .StartWith(false) is to make sure we open a window when you initially start the observable, otherwise values that happen before the first gating item are lost.
  • The DistintUntilChanged() is a cheap way to make sure our gates are t, f, t, f and never two of the same value in a row, which would cause two simultaneous windows to open.
  • The Publish calls are good practice to prevent multiple subscriptions. You can find better explanations for that in some other Q&A's on here.

Upvotes: 6

Related Questions