Reputation: 1248
I have 2 streams of events: 1. Stream of mouse drag and drop events (drag start ... drag end ... drag start ... drag end) 2. Stream of key press events ('a' ... 'b' .... 'c' .... 'd')
I need to combine into a stream that only contains events from the second streams (so only key presses) but it needs to filter out all key presses that occur between a drag start and drag end, except for the last one.
So if the sources are like this:
... Start ............... End .............. Start .............. End
and
...........'a'...'b'...'c'.......'d'...'e'..........'f'....'g'.......
The result should be like this:
...........................'c'...'d'...'e'..........................'g'
Is something like this possible using Rx.net in C#?
Upvotes: 3
Views: 1748
Reputation: 14350
The answer is yes. Answer first, then explanation:
public static class X
{
public static IObservable<T> GatedDebounce<T>(this IObservable<T> source, IObservable<bool> gating)
{
var finalStream = gating
.StartWith(false)
.DistinctUntilChanged()
.Publish(_gating => source.Publish(_source => Observable.Merge(
_source
.Window(_gating.Where(b => b), _ => _gating.Where(b => !b))
.SelectMany(o => o.LastAsync()),
_source
.Window(_gating.Where(b => !b), _ => _gating.Where(b => b))
.Merge()
)));
return finalStream;
}
}
Then, given an IObservable<T>
representing your values, and an IObservable<bool>
representing where drags start and stop (true meaning drag-start, and false meaning drag-end), you would call it like this:
var throttledStream= valueStream.GatedDebounce(gateStream);
Explanation:
To understand it better, let's throw out the Publish
calls, and break it up into pieces:
Piece 1,
source
.Window(gating.Where(b => b), _ => gating.Where(b => !b))
.SelectMany(o => o.LastAsync())
This Window
function means call means we start a sub-set observable (or window) whenever gating emits true, and end that window whenever gating emits false. From that window, we select the last item, if it exists. This will only be emitted when the window closes.
Piece 2,
source
.Window(gating.Where(b => !b), _ => gating.Where(b => b))
.Merge() //Equivalent to .SelectMany(o => o) if you prefer
This Window
function does the opposite: Start a window whenever gating emits false, and end it whenever gating emits true. From that window we emit everything when it arrives.
Put these two together with Merge
and you get 90% of the way to your solution. The rest:
.StartWith(false)
is to make sure we open a window when you initially start the observable, otherwise values that happen before the first gating item are lost.DistintUntilChanged()
is a cheap way to make sure our gates are t, f, t, f and never two of the same value in a row, which would cause two simultaneous windows to open.Publish
calls are good practice to prevent multiple subscriptions. You can find better explanations for that in some other Q&A's on here.Upvotes: 6