Reputation: 3740
How would I turn an observable of an enumerable xys
into an enumerable of observables yxs
, where each observable of yxs
focuses on a particular element of each time step of xys
? What I want is similar to transposition for an enumerable of enumerables.
Example:
IObservable<IEnumerable<int>> xys = Observable.Generate(0, _ => true, i => ++i, i => new[] {0 + i, 1 + i, 2 + i});
// xys = emits {0,1,2}, {1,2,3}, ...
IEnumerable<IObservable<int>> yxs = new[]
{
Observable.Generate(0, i=>true, i=> ++i, i=>i),
Observable.Generate(1, i=>true, i=> ++i, i=>i),
Observable.Generate(2, i=>true, i=> ++i, i=>i),
};
// yxs = {emits 0, 1, ...}, {emits 1, 2, ...}, {emits 2, 3, ...}
I'm specifically interested in a function which is already part of Rx. Like the above example, infinite observables should be possible as well as infinite enumerables.
Upvotes: 3
Views: 1125
Reputation: 3740
This is what I came up with so far. However, it's a self made solution, not part of Rx. I would appreciate far more if someone pointed me to a solution involving library functions.
public static IEnumerable<IObservable<T>> ObserveElements<T>(this IObservable<IEnumerable<T>> obs)
{
var i = 0;
while (true)
{
var idx = i++;
yield return from enumerable in obs
let x = enumerable.ElementAtOrDefault(idx)
where !Equals(x, default(T))
select x;
}
}
Obviously, you would .Take()
only as much observables as you need.
In Haskell terms, I think it's actually an implementation of sequence
for the IObservable
monad, specialized to IEnumerable
.
Upvotes: -1
Reputation: 18125
This converts your "types" correctly. However, you can't change the semantics of when/how the elements are delivered underneath, so you're really just going to end up buffering and blocking no matter how you go about this.
dest = source.ToEnumerable().Map(x => x.ToObservable());
Upvotes: 2