Reputation: 6361
I'm new to RX, and I have my desired scenario working well, but it seems to me there must be a simpler or more elegant way to achieve this. What I have is an IObservable<T>
and I want to subscribe to it in such a way that I end up with an IObservable<U>,
by triggering an asynchronous operation that generates a U for each T it sees.
What I have so far (that works great, but seems cumbersome) uses an intermediate event stream and goes something like this:
public class Converter {
public event EventHandler<UArgs> UDone;
public IConnectableObservable<U> ToUs(IObservable<T> ts) {
var us = Observable.FromEvent<UArgs>(this, "UDone").Select(e => e.EventArgs.U).Replay();
ts.Subscribe(t => Observable.Start(() => OnUDone(new U(t))));
return us;
}
private void OnUDone(U u) {
var uDone = UDone;
if (uDone != null) {
uDone(this, u);
}
}
}
...
var c = new Converter();
IConnectableObservable<T> ts = ...;
var us = c.ToUs(ts);
us.Connect();
...
I'm sure I'm missing a much simpler way to do this...
Upvotes: 0
Views: 373
Reputation: 84734
This is exactly what SelectMany
is for:
IObservable<int> ts
IObservable<string> us = ts.SelectMany(t => StartAsync(t));
us.Subscribe(u =>
Console.WriteLine("StartAsync completed with {0}", u));
...
private IObservable<string> StartAsync(int t)
{
return Observable.Return(t.ToString())
.Delay(TimeSpan.FromSeconds(1));
}
Keep in mind that if StartAsync
has a variable completion time, you may receive the output values in a different order from the input values.
Upvotes: 0
Reputation: 19117
SelectMany
should do what you need, to flatten out the IO<IO<T>>
Observable.Range(1, 10)
.Select(ii => Observable.Start(() =>
string.Format("{0} {1}", ii, Thread.CurrentThread.ManagedThreadId)))
.SelectMany(id=>id)
.Subscribe(Console.WriteLine);
Upvotes: 1