Reputation: 16874
I have a Stream of Streams. When one stream ends, and the next begins, I need to emit an intermediate event that bridges them. This needs to occur after the last event of the previous stream, and needs to mutate the state of the last event. Basically, the output stream needs to look like this:
Stream 1 - event 1
Stream 1 - event 2
Stream 1 - event 3 (Stream 1 completes)
* My bridging event (Mutation of event 3)
Stream 2 - event 1 etc
Upvotes: 0
Views: 109
Reputation: 79631
Simply transform each of the inner sequences with a single concatenated mutation of the last element.
IObservable<IObservable<T>> source = ...;
Func<T, T> mutation = ...;
IObservable<T> query = source
.Select(innerSource => innerSource
.Publish(o => o.Concat(o.LastAsync().Select(mutation))))
.Concat();
Note this assumes that each inner stream always has at least one item.
Upvotes: 3
Reputation: 16127
It sounds like you may want Concat instead of Switch. It sounds like one stream ends, you get a bridging value, and then you hook onto the next stream. This is more Concat than Switch.
With Switch, you may get:
The problem is that if you get a new stream appearing before an old stream completes, then LastAsync on the old stream won't fire and you won't get your bridging event.
If you were using Concat, your observable won't switch to Stream B until Stream A is complete, and if you using the approach from Timothy Shields, you'll get your mutated last value at the end of A.
However, if the situation you have is that you do want to Switch as soon as a new stream arrives, but you want your bridging event at that time, then it's a little more tricky. It might have to be an adapted version of Timothy Shields' answer, something like:
IObservable<IObservable<T>> source = ...;
Func<T, T> mutation = ...;
IObservable<T> query = source
.Select(innerSource => innerSource
.Publish(o => {
// end this stream when source pushes another value
var stream = o.TakeUntil(source);
return stream.Concat(stream.LastAsync().Select(mutation)));
})
.Concat();
There may be better/cleaner ways of doing this. With Rx, it's always worth trying something that seems complex a few different ways, because sometimes one way is much simpler than the others.
Upvotes: 2