Oleg Dok
Oleg Dok

Reputation: 21776

Rx.Net Window function extension to support count, time AND size

Actually I have an IObservable<byte[]> and want to window it not only on arrays count and time frame but also on the overall size of arrays passed the current window.

Please provide me with these techniques

Upvotes: 0

Views: 222

Answers (1)

Theodor Zoulias
Theodor Zoulias

Reputation: 43996

Here is a custom Window operator that attempts to replicate the functionality of the built-in Window overload that accepts timeSpan and count arguments, while also supporting the windowing by overall size.

public static IObservable<IObservable<TSource>> Window<TSource>(
    this IObservable<TSource> source, TimeSpan timeSpan, int count,
    int size, Func<TSource, int> sizeSelector, IScheduler scheduler = null)
{
    if (source == null) throw new ArgumentNullException(nameof(source));
    if (timeSpan < TimeSpan.Zero && timeSpan != Timeout.InfiniteTimeSpan)
        throw new ArgumentOutOfRangeException(nameof(timeSpan));
    if (count < 1) throw new ArgumentOutOfRangeException(nameof(count));
    if (size < 0) throw new ArgumentOutOfRangeException(nameof(size));
    if (sizeSelector == null) throw new ArgumentNullException(nameof(sizeSelector));
    scheduler = scheduler ?? Scheduler.Default;

    return Observable.Create<IObservable<TSource>>(observer =>
    {
        Subject<TSource> currentSubject = null;
        IStopwatch stopwatch = null;
        int itemCounter = 0;
        int currentSize = 0;

        return source.Subscribe(item =>
        {
            if (currentSubject == null)
            {
                currentSubject = new Subject<TSource>();
                observer.OnNext(currentSubject);
            }
            if (stopwatch == null && timeSpan != Timeout.InfiniteTimeSpan)
            {
                stopwatch = scheduler.StartStopwatch();
            }

            currentSubject.OnNext(item);
            itemCounter++;
            currentSize += sizeSelector(item);

            if (itemCounter == count
                || (stopwatch != null && stopwatch.Elapsed >= timeSpan)
                || currentSize >= size)
            {
                currentSubject.OnCompleted();
                currentSubject = null;
                if (stopwatch != null) stopwatch = scheduler.StartStopwatch();
                itemCounter = 0;
                currentSize = 0;
            }
        }, ex =>
        {
            if (currentSubject != null) currentSubject.OnError(ex);
            observer.OnError(ex);
        }, () =>
        {
            if (currentSubject != null) currentSubject.OnCompleted();
            observer.OnCompleted();
        });
    });
}

Usage example:

observable.Window(timeSpan: TimeSpan.FromMilliseconds(5000), count: 10,
    size: 100, sizeSelector: x => x.Length);

A new window is created every time the current window is full, or the given amount of time has elapsed, or the accumulated size becomes larger than the given size. The timer is restarted every time a window is emitted.

Upvotes: 2

Related Questions