Reputation: 9383
My problem is sort of like the one the Nagle algorithm was created to solve, but not exactly. What I'd like is to buffer the OnNext
notifications from an IObservable<T>
into a sequence of IObservable<IList<T>>
s like so:
T
notification arrives, add it to a buffer and start a countdownT
notification arrives before the countdown expires, add it to the buffer and restart the countdownT
notifications as a single aggregate IList<T>
notification.IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler)
looked promising, but it appears to send aggregate notifications out at regular intervals rather than doing the "start the timer when the first notification arrives and restart it when additional ones arrive" behavior I'd like, and it also sends out an empty list at the end of each time window if no notifications have been produced from below.
I do not want to drop any of the T
notifications; just buffer them.
Does anything like this exist, or do I need to write my own?
Upvotes: 4
Views: 1458
Reputation: 21
In the article about rx, posted Ian Griffiths on site, there is an example of the implementation of such an operator in the section LINQ Operators and Composition. A detailed description of his work is also given there.
static class RxExt
{
public static IObservable<IList<T>> Quiescent<T>( this IObservable<T> src,
TimeSpan minimumInactivityPeriod, IScheduler scheduler) {
IObservable<int> onoffs =
from _ in src
from delta in
Observable.Return(1, scheduler)
.Concat(Observable.Return(-1, scheduler)
.Delay(minimumInactivityPeriod, scheduler))
select delta;
IObservable<int> outstanding = onoffs.Scan(0, (total, delta) => total + delta);
IObservable<int> zeroCrossings = outstanding.Where(total => total == 0);
return src.Buffer(zeroCrossings);
}
}
Upvotes: 0
Reputation: 1365
Nice solutions. In my opinion, creating behaviour using existing operators is just for convenience but not for performance.
Also, we should always return IEnumerable instead of IList. Returning the least derived type (IEnumerable) will leave you the most leeway to change the underlying implementation down the track.
Here is my version to implement custom operator.
public static IObservable<IEnumerable<TValue>> BufferWithThrottle<TValue>(this IObservable<TValue> @this, int maxAmount, TimeSpan threshold)
{
var buffer = new List<TValue>();
return Observable.Create<IEnumerable<TValue>>(observer =>
{
var aTimer = new Timer();
void Clear()
{
aTimer.Stop();
buffer.Clear();
}
void OnNext()
{
observer.OnNext(buffer);
Clear();
}
aTimer.Interval = threshold.TotalMilliseconds;
aTimer.Enabled = true;
aTimer.Elapsed += (sender, args) => OnNext();
var subscription = @this.Subscribe(value =>
{
buffer.Add(value);
if (buffer.Count >= maxAmount)
OnNext();
else
{
aTimer.Stop();
aTimer.Start();
}
});
return Disposable.Create(() =>
{
Clear();
subscription.Dispose();
});
});
}
By testing the performance comparing to other solutions, it can save up to 30% CPU power and resolve the memory issue.
Upvotes: 2
Reputation: 16107
Interesting operator. Supertopi's answer is a good one, but there's an improvement that can be made. If maxAmount
is large, and/or the rate of notifications is high, then using Buffer
will burn the GC by allocating buffers that get thrown away shortly afterwards.
In order to close each GroupBy
Observable after a maxAmount
is reached, you don't need to capture a Buffer
of all of those elements just to know when it's full. Based on Supertopi's answer, you could change it slightly to the following. Instead of collecting a Buffer
of maxAmount
elements, it just signals after it has seen maxAmount
elements on the stream.
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source, int maxAmount, TimeSpan threshold)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge(g.Take(maxAmount)
.LastAsync()
.Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
Upvotes: 7
Reputation: 3488
Some similar questions exist on SO but not exactly like this. Here's an extension method that does the trick.
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>
(this IObservable<TSource> source,
int maxAmount, TimeSpan threshold)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge( g.Buffer(maxAmount).Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
Upvotes: 6