Reputation: 8953
I'm trying to implement the equivalent of RxJS throttleTime
using Rx.NET (it's not implemented in .NET).
throttleTime
emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process.
This differs from throttle
which waits a time before emitting. throttleTime
emits the first item and then waits before emitting again.
Any pointers as to how to go about this would be much appreciated.
Upvotes: 0
Views: 268
Reputation: 14350
This should do it:
public static IObservable<T> ThrottleTime<T>(this IObservable<T> source, TimeSpan ts)
{
return ThrottleTime(source, ts, Scheduler.Default);
}
public static IObservable<T> ThrottleTime<T>(this IObservable<T> source, TimeSpan ts, IScheduler scheduler)
{
return source
.Timestamp(scheduler)
.Scan((EmitValue: false, OpenTime: DateTimeOffset.MinValue, Item: default(T)), (state, item) => item.Timestamp > state.OpenTime
? (true, item.Timestamp + ts, item.Value)
: (false, state.OpenTime, item.Value)
)
.Where(t => t.EmitValue)
.Select(t => t.Item);
}
Explanation: Think of ThrottleTime
as having a single state variable: The next time the gate opens to a new value. If the source item is before this timed-gate value, then nothing happens. If the source item is after it, then pass it through, and reset the gate value to the latest timestamp.
Scan
holds the time value in a tuple (the OpenTime
variable). The other fields on the tuple are more helpful downstream.
Upvotes: 2