Reputation: 1421
All the documentation I found about rx.net throttling does not cover the overload with a parameter Func<TSource, IObservable<TThrottle>> throttleDurationSelector
. So all I have available are the XML-comments. This suggests, that throttleDurationSelector
is called on every new element in the source sequence. The expected return value would be a IObservable<TThrottle>
. (My understanding) This enables the possibility to change the throttle-delay on every new element. But, this understanding does not match the runtime experience I discover.
var s = new Subject<string>();
s.Throttle(_ => Observable.Return(TimeSpan.FromMilliseconds(500))) // for simplicity of this demo, always return the same delay
.Subscribe(_ => Console.WriteLine($"{DateTime.Now}.{DateTime.Now.Millisecond} event {_}"));
for (int i = 0; i < 5; i++)
s.OnNext("a");
Thread.Sleep(1000);
for (int i = 0; i < 5; i++)
s.OnNext("b");
Thread.Sleep(1000);
According to the previously mentioned understanding (which is obviously wrong), I'd have expected the following output.
10.02.2022 11:47:54.386 event a
10.02.2022 11:47:55.388 event b
Instead this output is generated. It seems that there is no throttling applied at all.
10.02.2022 11:46:49.431 event a
10.02.2022 11:46:49.432 event a
10.02.2022 11:46:49.432 event a
10.02.2022 11:46:49.432 event a
10.02.2022 11:46:49.432 event a
10.02.2022 11:46:50.448 event b
10.02.2022 11:46:50.448 event b
10.02.2022 11:46:50.448 event b
10.02.2022 11:46:50.448 event b
10.02.2022 11:46:50.449 event b
What is the purpose of this overload, if not dynamically changing the throttle-delay (which is what I need)?
Also, why the 'complicated' syntax of Func<TSource, IObservable<TThrottle>> throttleDurationSelector
where a simpler parameter Func<TSource, TThrottle> throttleDurationSelector
would be good enough?
Upvotes: 1
Views: 195
Reputation: 43553
We are talking about this overload of the Throttle
operator:
public static IObservable<TSource> Throttle<TSource, TThrottle>(
this IObservable<TSource> source,
Func<TSource, IObservable<TThrottle>> throttleDurationSelector);
The intention of the throttleDurationSelector
parameter is to give you maximum flexibility at how you define the throttle duration for each element. The value of the TThrottle
is not important. What is important is when it emits an element (any element), or when it completes. This is how the duration is defined, by observing dynamically the TThrottle
sequence, and waiting it to emit any kind of notification.
Going back to your code, you are returning an Observable.Return
as the throttling sequence. This sequence emits a TimeSpan
element immediately, causing the associated string
element of the source s
sequence to be also emitted immediately. The result is that no throttling occurs. It is equivalent to throttling with a TimeSpan.Zero
argument, or with removing the Throttle
operator altogether.
To fix your code, you must replace the Observable.Return
with a sequence that emits an element with a delay, like an Observable.Timer
sequence. So if you know the throttling duration immediately after receiving a TSource
element, and let's say for simplicity that it is always 500 milliseconds, you can just replace this:
s.Throttle(_ => Observable.Return(TimeSpan.FromMilliseconds(500)))
...with this:
s.Throttle(_ => Observable.Timer(TimeSpan.FromMilliseconds(500)))
Upvotes: 3