Sebastian Graf
Sebastian Graf

Reputation: 3740

Turn an IObservable<IEnumerable<T>> into an IEnumerable<IObservable<T>>

How would I turn an observable of an enumerable xys into an enumerable of observables yxs, where each observable of yxs focuses on a particular element of each time step of xys? What I want is similar to transposition for an enumerable of enumerables.

Example:

IObservable<IEnumerable<int>> xys = Observable.Generate(0, _ => true, i => ++i, i => new[] {0 + i, 1 + i, 2 + i});
// xys = emits {0,1,2}, {1,2,3}, ...
IEnumerable<IObservable<int>> yxs = new[]
{
    Observable.Generate(0, i=>true, i=> ++i, i=>i),
    Observable.Generate(1, i=>true, i=> ++i, i=>i),
    Observable.Generate(2, i=>true, i=> ++i, i=>i),
};
// yxs = {emits 0, 1, ...}, {emits 1, 2, ...}, {emits 2, 3, ...}

I'm specifically interested in a function which is already part of Rx. Like the above example, infinite observables should be possible as well as infinite enumerables.

Upvotes: 3

Views: 1125

Answers (2)

Sebastian Graf
Sebastian Graf

Reputation: 3740

This is what I came up with so far. However, it's a self made solution, not part of Rx. I would appreciate far more if someone pointed me to a solution involving library functions.

public static IEnumerable<IObservable<T>> ObserveElements<T>(this IObservable<IEnumerable<T>> obs)
{
    var i = 0;
    while (true)
    {
        var idx = i++;
        yield return from enumerable in obs
                     let x = enumerable.ElementAtOrDefault(idx)
                     where !Equals(x, default(T))
                     select x;
    }
}

Obviously, you would .Take() only as much observables as you need.

In Haskell terms, I think it's actually an implementation of sequence for the IObservable monad, specialized to IEnumerable.

Upvotes: -1

cwharris
cwharris

Reputation: 18125

This converts your "types" correctly. However, you can't change the semantics of when/how the elements are delivered underneath, so you're really just going to end up buffering and blocking no matter how you go about this.

dest = source.ToEnumerable().Map(x => x.ToObservable());

Upvotes: 2

Related Questions