dlf
dlf

Reputation: 9383

"Buffer until quiet" behavior from Reactive?

My problem is sort of like the one the Nagle algorithm was created to solve, but not exactly. What I'd like is to buffer the OnNext notifications from an IObservable<T> into a sequence of IObservable<IList<T>>s like so:

  1. When the first T notification arrives, add it to a buffer and start a countdown
  2. If another T notification arrives before the countdown expires, add it to the buffer and restart the countdown
  3. Once the countdown expires (i.e. the producer has been silent for some length of time), forward all the buffered T notifications as a single aggregate IList<T> notification.
  4. If the buffer size grows beyond some maximum before the countdown expires, send it anyway.

IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler) looked promising, but it appears to send aggregate notifications out at regular intervals rather than doing the "start the timer when the first notification arrives and restart it when additional ones arrive" behavior I'd like, and it also sends out an empty list at the end of each time window if no notifications have been produced from below.

I do not want to drop any of the T notifications; just buffer them.

Does anything like this exist, or do I need to write my own?

Upvotes: 4

Views: 1458

Answers (4)

In the article about rx, posted Ian Griffiths on site, there is an example of the implementation of such an operator in the section LINQ Operators and Composition. A detailed description of his work is also given there.

static class RxExt
{
    public static IObservable<IList<T>> Quiescent<T>( this IObservable<T> src,
        TimeSpan minimumInactivityPeriod, IScheduler scheduler) {
        IObservable<int> onoffs =
            from _ in src
            from delta in
                Observable.Return(1, scheduler)
                    .Concat(Observable.Return(-1, scheduler)
                        .Delay(minimumInactivityPeriod, scheduler))
            select delta;
        IObservable<int> outstanding = onoffs.Scan(0, (total, delta) => total + delta);
        IObservable<int> zeroCrossings = outstanding.Where(total => total == 0);
        return src.Buffer(zeroCrossings);
    }
}

Upvotes: 0

shtse8
shtse8

Reputation: 1365

Nice solutions. In my opinion, creating behaviour using existing operators is just for convenience but not for performance.

Also, we should always return IEnumerable instead of IList. Returning the least derived type (IEnumerable) will leave you the most leeway to change the underlying implementation down the track.

Here is my version to implement custom operator.

public static IObservable<IEnumerable<TValue>> BufferWithThrottle<TValue>(this IObservable<TValue> @this, int maxAmount, TimeSpan threshold)
    {
        var buffer = new List<TValue>();

        return Observable.Create<IEnumerable<TValue>>(observer =>
        {
            var aTimer = new Timer();
            void Clear()
            {
                aTimer.Stop();
                buffer.Clear();
            }
            void OnNext()
            {
                observer.OnNext(buffer);
                Clear();
            }
            aTimer.Interval = threshold.TotalMilliseconds;
            aTimer.Enabled = true;
            aTimer.Elapsed += (sender, args) => OnNext();
            var subscription = @this.Subscribe(value =>
            {
                buffer.Add(value);
                if (buffer.Count >= maxAmount)
                    OnNext();
                else
                {
                    aTimer.Stop();
                    aTimer.Start();
                }
            });
            return Disposable.Create(() =>
            {
                Clear();
                subscription.Dispose();
            });
        });
    }

By testing the performance comparing to other solutions, it can save up to 30% CPU power and resolve the memory issue.

Upvotes: 2

Niall Connaughton
Niall Connaughton

Reputation: 16107

Interesting operator. Supertopi's answer is a good one, but there's an improvement that can be made. If maxAmount is large, and/or the rate of notifications is high, then using Buffer will burn the GC by allocating buffers that get thrown away shortly afterwards.

In order to close each GroupBy Observable after a maxAmount is reached, you don't need to capture a Buffer of all of those elements just to know when it's full. Based on Supertopi's answer, you could change it slightly to the following. Instead of collecting a Buffer of maxAmount elements, it just signals after it has seen maxAmount elements on the stream.

public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source, int maxAmount, TimeSpan threshold)
{
    return Observable.Create<IList<TSource>>((obs) =>
    {
        return source.GroupByUntil(_ => true,
                                   g => g.Throttle(threshold).Select(_ => Unit.Default)
                                         .Merge(g.Take(maxAmount)
                                                 .LastAsync()
                                                 .Select(_ => Unit.Default)))
                     .SelectMany(i => i.ToList())
                     .Subscribe(obs);
    });
}

Upvotes: 7

supertopi
supertopi

Reputation: 3488

Some similar questions exist on SO but not exactly like this. Here's an extension method that does the trick.

public static IObservable<IList<TSource>> BufferWithThrottle<TSource>
                                          (this IObservable<TSource> source,
                                           int maxAmount, TimeSpan threshold)
{
    return Observable.Create<IList<TSource>>((obs) =>
    {
        return source.GroupByUntil(_ => true,
                                   g => g.Throttle(threshold).Select(_ => Unit.Default)
                                         .Merge( g.Buffer(maxAmount).Select(_ => Unit.Default)))
                     .SelectMany(i => i.ToList())
                     .Subscribe(obs);
    });
}

Upvotes: 6

Related Questions