Stuart Hallows
Stuart Hallows

Reputation: 8953

Equivalent of RxJS throttleTime in Rx.NET

I'm trying to implement the equivalent of RxJS throttleTime using Rx.NET (it's not implemented in .NET).

throttleTime emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process. This differs from throttle which waits a time before emitting. throttleTime emits the first item and then waits before emitting again.

Any pointers as to how to go about this would be much appreciated.

Upvotes: 0

Views: 268

Answers (1)

Shlomo
Shlomo

Reputation: 14350

This should do it:

public static IObservable<T> ThrottleTime<T>(this IObservable<T> source, TimeSpan ts)
{
    return ThrottleTime(source, ts, Scheduler.Default);
}

public static IObservable<T> ThrottleTime<T>(this IObservable<T> source, TimeSpan ts, IScheduler scheduler)
{
    return source
        .Timestamp(scheduler)
        .Scan((EmitValue: false, OpenTime: DateTimeOffset.MinValue, Item: default(T)), (state, item) => item.Timestamp > state.OpenTime
            ? (true, item.Timestamp + ts, item.Value)
            : (false, state.OpenTime, item.Value)
        )
        .Where(t => t.EmitValue)
        .Select(t => t.Item);
}

Explanation: Think of ThrottleTime as having a single state variable: The next time the gate opens to a new value. If the source item is before this timed-gate value, then nothing happens. If the source item is after it, then pass it through, and reset the gate value to the latest timestamp.

Scan holds the time value in a tuple (the OpenTime variable). The other fields on the tuple are more helpful downstream.

Upvotes: 2

Related Questions