Reputation: 120410
I made a extension method:
public static IObservable<T> RateLimit<T>(this IObservable<T> source,
TimeSpan minDelay)
{
return
source.TimeInterval()
.Select(
(x, i) =>
Observable.Return(x.Value)
.Delay(i == 0
? TimeSpan.Zero
: TimeSpan.FromTicks(
Math.Max(minDelay.Ticks - x.Interval.Ticks, 0))))
.Concat();
}
this creates a new observable that only lets items through with a minimum separation in time.
To remove initial latency, it is necessary to treat the first item differently.
As can be seen, there is a test to see if we are dealing with the first item by testing i == 0
. The problem here is that if we process more than int.MaxValue
items, this will fail.
Instead, I thought about the following sequence
var trueThenFalse = Observable.Return(true)
.Concat(Observable.Repeat(false))
and zipping it up alongside my source:
source.TimeInterval().Zip(trueThenFalse, ...
but when passing this infinite sequence to Zip, we appear to enter a tight loop where trueThenFalse
emits all items in one go (to infinity). Fail.
I could easily code around this with side-effects (a bool
in the outer scope, for instance), but this would represent a loss of purity that I would not be happy with.
Any suggestions?
EDIT
Although not quite the same behaviour, the following code exhibits some nasty traits
var trueThenFalse = Observable.Return(true)
.Concat(Observable.Repeat(false));
var src = Observable.Interval(TimeSpan.FromSeconds(1)); //never completes
src.Zip(trueThenFalse,(i,tf)=>tf).ForEach(x=>Trace.TraceInformation("{0}",x));
and eventually dies with an OOME. This is because trueThenFalse
appears to be unspooling all of its values, but they aren't consumed by the Zip in a timely fashion.
Upvotes: 4
Views: 205
Reputation: 14350
This is similar to Rx IObservable buffering to smooth out bursts of events, though you clearly are trying to understand why your solution does/doesn't work.
I find the solution there more elegant, though to each their own.
Upvotes: 1
Reputation: 120410
So it turns out that Zip has another overload that can zip together an IObservable sequence with an IEnumerable sequence.
By combining the push semantics of IObservable with the pull semantics of IEnumerable, it is possible to get my test case working.
So, with the following method:
private IEnumerable<T> Inf<T>(T item)
{
for (;;)
{
yield return item;
}
}
we can make an IEnumerable:
var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false));
and then Zip it with a source observable:
var src = Observable.Interval(TimeSpan.FromSeconds(1));
src.Zip(trueThenFalse, (i, tf) => tf).ForEach(x => Trace.TraceInformation("{0}", x));
...and everything works as expected.
I now have the following implementation for my RateLimiter method:
public static IObservable<T> RateLimit<T>(this IObservable<T> source,
TimeSpan minDelay)
{
var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false));
return
source.TimeInterval()
.Zip(trueThenFalse, (item, firstTime) => Observable.Return(item.Value)
.Delay(firstTime
? TimeSpan.Zero
: TimeSpan.FromTicks(
Math.Max(minDelay.Ticks - item.Interval.Ticks, 0))))
.Concat();
}
Upvotes: 3