Reputation: 21776
Actually I have an IObservable<byte[]>
and want to window it not only on arrays count and time frame but also on the overall size of arrays passed the current window.
Please provide me with these techniques
Upvotes: 0
Views: 222
Reputation: 43996
Here is a custom Window
operator that attempts to replicate the functionality of the built-in Window
overload that accepts timeSpan
and count
arguments, while also supporting the windowing by overall size.
public static IObservable<IObservable<TSource>> Window<TSource>(
this IObservable<TSource> source, TimeSpan timeSpan, int count,
int size, Func<TSource, int> sizeSelector, IScheduler scheduler = null)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (timeSpan < TimeSpan.Zero && timeSpan != Timeout.InfiniteTimeSpan)
throw new ArgumentOutOfRangeException(nameof(timeSpan));
if (count < 1) throw new ArgumentOutOfRangeException(nameof(count));
if (size < 0) throw new ArgumentOutOfRangeException(nameof(size));
if (sizeSelector == null) throw new ArgumentNullException(nameof(sizeSelector));
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<IObservable<TSource>>(observer =>
{
Subject<TSource> currentSubject = null;
IStopwatch stopwatch = null;
int itemCounter = 0;
int currentSize = 0;
return source.Subscribe(item =>
{
if (currentSubject == null)
{
currentSubject = new Subject<TSource>();
observer.OnNext(currentSubject);
}
if (stopwatch == null && timeSpan != Timeout.InfiniteTimeSpan)
{
stopwatch = scheduler.StartStopwatch();
}
currentSubject.OnNext(item);
itemCounter++;
currentSize += sizeSelector(item);
if (itemCounter == count
|| (stopwatch != null && stopwatch.Elapsed >= timeSpan)
|| currentSize >= size)
{
currentSubject.OnCompleted();
currentSubject = null;
if (stopwatch != null) stopwatch = scheduler.StartStopwatch();
itemCounter = 0;
currentSize = 0;
}
}, ex =>
{
if (currentSubject != null) currentSubject.OnError(ex);
observer.OnError(ex);
}, () =>
{
if (currentSubject != null) currentSubject.OnCompleted();
observer.OnCompleted();
});
});
}
Usage example:
observable.Window(timeSpan: TimeSpan.FromMilliseconds(5000), count: 10,
size: 100, sizeSelector: x => x.Length);
A new window is created every time the current window is full, or the given amount of time has elapsed, or the accumulated size becomes larger than the given size. The timer is restarted every time a window is emitted.
Upvotes: 2