Bogey
Bogey

Reputation: 5724

System.Reactive - Buffer/group immediately available values of an Observable

let's say you have an IObservable<T> that may supply a few values immediately, and some being pushed continously:

    var immediate_values = new [] { "curerntly", "available", "values" }.ToObservable();
    var future_values = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1)).Select(x => "new value!");

    IObservable<string> input = immediate_values.Concat(future_values);

Is there any way to transform input into an IObservable<string[]>, where the first array being pushed consists of all immediately available values, and each subsequent array consists of only 1 value (each one being pushed thereafter)? Above is just example data naturally, this would need to work on any IObservable>T> without knowing the individual input streams.

    IObservable<string[]> buffered = input.BufferSomehow();
    // should push values:
    // First value: string[] = ["currently", "available", "values"]
    // Second value: string[] = ["new value!"]
    // Third value: string[] = ["new value!"]
    // .....

I've thought of the .Buffer() function of course, but I don't really want to buffer by any particular TimeSpan, and can't think of any way to produce an observable with buffer window closing signals.

Can anyone think of a reasonable way to achieve this, or is this not really possible at all?

Thanks!

Upvotes: 2

Views: 606

Answers (3)

Theodor Zoulias
Theodor Zoulias

Reputation: 43474

You could take advantage of the fact that the immediately available values are propagated synchronously during the subscription, and toggle some switch after the Subscribe method returns. The implementation below is based on this idea. During the subscription all incoming messages are buffered, after the subscription the buffer is emitted, and after that all future incoming messages are emitted immediately one by one.

public static IObservable<T[]> BufferImmediatelyAvailable<T>(
    this IObservable<T> source)
{
    return Observable.Create<T[]>(observer =>
    {
        var buffer = new List<T>();

        var subscription = source.Subscribe(x =>
        {
            if (buffer != null)
                buffer.Add(x);
            else
                observer.OnNext(new[] { x });
        }, ex =>
        {
            buffer = null;
            observer.OnError(ex);
        }, () =>
        {
            if (buffer != null)
            {
                var output = buffer.ToArray();
                buffer = null;
                observer.OnNext(output);
            }
            observer.OnCompleted();
        });

        if (buffer != null)
        {
            var output = buffer.ToArray();
            buffer = null;
            observer.OnNext(output);
        }

        return subscription;
    });
}

Upvotes: 0

Shlomo
Shlomo

Reputation: 14350

There is no direct way to distinguish between the on-start-up values of an observable and the subsequent values. My suggestion would be to infer it:

var autoBufferedInput1 = input.Publish(_input => _input
   .Buffer(_input.Throttle(TimeSpan.FromSeconds(.1)))
   .Select(l => l.ToArray())
);

This sets your buffer boundary to a rolling, extending window of .1 seconds: Each time a value comes in, it extends the window to .1 seconds from the time the value came in, and adds the value to the buffer. If .1 seconds go by with no values, then the buffer is flushed out.

This will have the side-effect that if you have near-simultaneous "hot" values (within .1 seconds of each other), then those will be buffered together. If that's undesired, you can Switch out, though that makes things more complicated:

var autoBufferedInput2 = input.Publish(_input =>
    _input.Throttle(TimeSpan.FromSeconds(.1)).Publish(_boundary => _boundary
        .Take(1)
        .Select(_ => _input.Select(s => new[] { s }))
        .StartWith(_input
            .Buffer(_boundary)
            .Select(l => l.ToArray())
        )
        .Switch()
    )
);

autoBufferedInput2 uses the .1 second inference method until the first buffered list, then switches to simply selecting out and wrapping values in an array.


EDIT: If you want an absolute 1 second gate as well, then the snippets would look like this:

var autoBufferedInput1 = input.Publish(_input => _input
    .Buffer(
        Observable.Merge(
            Observable.Timer(TimeSpan.FromSeconds(1)).Select(_ => Unit.Default),
            _input.Throttle(TimeSpan.FromSeconds(.1)).Select(_ => Unit.Default)
        )
    )
    .Select(l => l.ToArray())
);

var autoBufferedInput2 = input.Publish(_input =>
    Observable.Merge(
        _input.Throttle(TimeSpan.FromSeconds(.1)).Select(_ => Unit.Default),
        Observable.Timer(TimeSpan.FromSeconds(1)).Select(_ => Unit.Default)
    )
    .Publish(_boundary => _boundary
        .Take(1)
        .Select(_ => _input.Select(s => new[] { s }))
        .StartWith(_input
            .Buffer(_boundary)
            .Select(l => l.ToArray())
        )
        .Switch()
    )
);

Upvotes: 2

Aaron
Aaron

Reputation: 652

For any IObservable<T>, you'd need to do:

var sequence = ongoingSequence.StartWith(initialSequence);

Upvotes: 0

Related Questions