spender
spender

Reputation: 120410

Zipping with an infinite sequence that is true then always false

I made a extension method:

public static IObservable<T> RateLimit<T>(this IObservable<T> source, 
                                          TimeSpan minDelay)
{
    return
        source.TimeInterval()
            .Select(
                (x, i) =>
                    Observable.Return(x.Value)
                        .Delay(i == 0
                            ? TimeSpan.Zero
                            : TimeSpan.FromTicks(
                                  Math.Max(minDelay.Ticks - x.Interval.Ticks, 0))))
            .Concat();
}

this creates a new observable that only lets items through with a minimum separation in time.

To remove initial latency, it is necessary to treat the first item differently.

As can be seen, there is a test to see if we are dealing with the first item by testing i == 0. The problem here is that if we process more than int.MaxValue items, this will fail.

Instead, I thought about the following sequence

var trueThenFalse = Observable.Return(true)
                    .Concat(Observable.Repeat(false))

and zipping it up alongside my source:

source.TimeInterval().Zip(trueThenFalse, ...

but when passing this infinite sequence to Zip, we appear to enter a tight loop where trueThenFalse emits all items in one go (to infinity). Fail.

I could easily code around this with side-effects (a bool in the outer scope, for instance), but this would represent a loss of purity that I would not be happy with.

Any suggestions?

EDIT

Although not quite the same behaviour, the following code exhibits some nasty traits

var trueThenFalse = Observable.Return(true)
    .Concat(Observable.Repeat(false));
var src = Observable.Interval(TimeSpan.FromSeconds(1)); //never completes
src.Zip(trueThenFalse,(i,tf)=>tf).ForEach(x=>Trace.TraceInformation("{0}",x));

and eventually dies with an OOME. This is because trueThenFalse appears to be unspooling all of its values, but they aren't consumed by the Zip in a timely fashion.

Upvotes: 4

Views: 205

Answers (2)

Shlomo
Shlomo

Reputation: 14350

This is similar to Rx IObservable buffering to smooth out bursts of events, though you clearly are trying to understand why your solution does/doesn't work.

I find the solution there more elegant, though to each their own.

Upvotes: 1

spender
spender

Reputation: 120410

So it turns out that Zip has another overload that can zip together an IObservable sequence with an IEnumerable sequence.

By combining the push semantics of IObservable with the pull semantics of IEnumerable, it is possible to get my test case working.

So, with the following method:

private IEnumerable<T> Inf<T>(T item)
{
    for (;;)
    {
        yield return item;
    }
}

we can make an IEnumerable:

var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false));

and then Zip it with a source observable:

var src = Observable.Interval(TimeSpan.FromSeconds(1));
src.Zip(trueThenFalse, (i, tf) => tf).ForEach(x => Trace.TraceInformation("{0}", x));

...and everything works as expected.

I now have the following implementation for my RateLimiter method:

public static IObservable<T> RateLimit<T>(this IObservable<T> source,
                                          TimeSpan minDelay)
{
    var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false));
    return
        source.TimeInterval()
            .Zip(trueThenFalse, (item, firstTime) => Observable.Return(item.Value)
                .Delay(firstTime
                    ? TimeSpan.Zero
                    : TimeSpan.FromTicks(
                        Math.Max(minDelay.Ticks - item.Interval.Ticks, 0))))

            .Concat();
}

Upvotes: 3

Related Questions