Alex
Alex

Reputation: 7919

"Merging" a stream of streams to produce a stream of the latest values of each

I have an IObservable<IObservable<T>> where each inner IObservable<T> is a stream of values followed by an eventual OnCompleted event.

I would like to transform this into an IObservable<IEnumerable<T>>, a stream consisting of the latest value from any inner stream that is not completed. It should produce a new IEnumerable<T> whenever a new value is produced from one of the inner streams (or an inner stream expires)

It is most easily shown with a marble diagram (which I hope is comprehensive enough):

input ---.----.---.----------------
         |    |   '-f-----g-|      
         |    'd------e---------|
         'a--b----c-----|          

result ---a--b-b--c-c-c-e-e-e---[]-
               d  d d e f g        
                    f f            

([] is an empty IEnumerable<T> and -| represents the OnCompleted)

You can see that it slightly resembles a CombineLatest operation. I have been playing around with Join and GroupJoin to no avail but I feel that that is almost certainly the right direction to be heading in.

I would like to use as little state as possible in this operator.

Update

I have updated this question to include not just single-valued sequences - the resultant IObservable<IEnumerable<T>> should include only the latest value from each sequence - if a sequence has not produced a value, it should not be included.

Upvotes: 7

Views: 404

Answers (3)

Alex
Alex

Reputation: 7919

Another solution given by Dave Sexton, creator of Rxx - it uses Rxx.CombineLatest which appears to be quite similar in its implementation to Brandon's solution:

public static IObservable<IEnumerable<T>> CombineLatestEagerly<T>(this IObservable<IObservable<T>> source)
{
  return source
    // Reify completion to force an additional combination:
    .Select(o => o.Select(v => new { Value = v, HasValue = true })
                  .Concat(Observable.Return(new { Value = default(T), HasValue = false })))
    // Merge a completed observable to force combination with the first real inner observable:
    .Merge(Observable.Return(Observable.Return(new { Value = default(T), HasValue = false })))
    .CombineLatest()
    // Filter out completion notifications:
    .Select(l => l.Where(v => v.HasValue).Select(v => v.Value));
}

Upvotes: 0

Brandon
Brandon

Reputation: 39222

Here's a version based your solution yesterday, tweaked for the new requirements. The basic idea is to just put a reference into your perishable collection, and then update the value of the reference as the inner sequence produces new values.

I also modified to properly track the inner subscriptions and unsubscribe if the outer observable is unsubscribed.

Also modified to tear it all down if any of the streams produce an error.

Finally, I fixed some race conditions that could violate Rx Guidelines. If your inner observables are firing concurrently from different threads, you could wind up call obs.OnNext concurrently which is a big no-no. So I've gated each inner observable using the same lock to prevent that (see the Synchronize call). Note that because of this, you could probably get away with using a regular double linked list instead of the PerishableCollection because now all of the code using the collection is within a lock so you don't need the threading guarantees of the PerishableCollection.

// Acts as a reference to the current value stored in the list
private class BoxedValue<T>
{
    public T Value;
    public BoxedValue(T initialValue) { Value = initialValue; }
}

public static IObservable<IEnumerable<T>> MergeLatest<T>(this IObservable<IObservable<T>> source)
{
    return Observable.Create<IEnumerable<T>>(obs =>
    {
        var collection = new PerishableCollection<BoxedValue<T>>();
        var outerSubscription = new SingleAssignmentDisposable();
        var subscriptions = new CompositeDisposable(outerSubscription);
        var innerLock = new object();

        outerSubscription.Disposable = source.Subscribe(duration =>
        {
            BoxedValue<T> value = null;
            var lifetime = new DisposableLifetime(); // essentially a CancellationToken
            var subscription = new SingleAssignmentDisposable();

            subscriptions.Add(subscription);
            subscription.Disposable = duration.Synchronize(innerLock)
                .Subscribe(
                    x =>
                    {
                        if (value == null)
                        {
                            value = new BoxedValue<T>(x);
                            collection.Add(value, lifetime.Lifetime);
                        }
                        else
                        {
                            value.Value = x;
                        }
                        obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
                    },
                    obs.OnError, // handle an error in the stream.
                    () => // on complete
                    {
                        if (value != null)
                        {
                            lifetime.Dispose(); // removes the item
                            obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
                            subscriptions.Remove(subscription); // remove this subscription
                        }
                    }
            );
        });

        return subscriptions;
    });
}

Upvotes: 3

Alex
Alex

Reputation: 7919

This solution will work for one-item streams but unfortunately accumulates every item in an inner stream until it finishes.

public static IObservable<IEnumerable<T>> MergeLatest<T>(this IObservable<IObservable<T>> source)
{
    return Observable.Create<IEnumerable<T>>(obs =>
    {
        var collection = new PerishableCollection<T>();
        return source.Subscribe(duration =>
        {
            var lifetime = new DisposableLifetime(); // essentially a CancellationToken
            duration
                .Subscribe(
                    x => // on initial item
                    {
                        collection.Add(x, lifetime.Lifetime);
                        obs.OnNext(collection.CurrentItems().Select(p => p.Value));
                    },
                    () => // on complete
                    {
                        lifetime.Dispose(); // removes the item
                        obs.OnNext(collection.CurrentItems().Select(p => p.Value));
                    }
            );
        });
    });
}

Upvotes: 0

Related Questions