James L
James L

Reputation: 16874

How can I Switch() two streams, and emit an intermediate event between them?

I have a Stream of Streams. When one stream ends, and the next begins, I need to emit an intermediate event that bridges them. This needs to occur after the last event of the previous stream, and needs to mutate the state of the last event. Basically, the output stream needs to look like this:

Stream 1 - event 1
Stream 1 - event 2
Stream 1 - event 3  (Stream 1 completes)
* My bridging event (Mutation of event 3)
Stream 2 - event 1 etc

Upvotes: 0

Views: 109

Answers (2)

Timothy Shields
Timothy Shields

Reputation: 79631

Simply transform each of the inner sequences with a single concatenated mutation of the last element.

IObservable<IObservable<T>> source = ...;
Func<T, T> mutation = ...;
IObservable<T> query = source
    .Select(innerSource => innerSource
        .Publish(o => o.Concat(o.LastAsync().Select(mutation))))
    .Concat();

Note this assumes that each inner stream always has at least one item.

Upvotes: 3

Niall Connaughton
Niall Connaughton

Reputation: 16127

It sounds like you may want Concat instead of Switch. It sounds like one stream ends, you get a bridging value, and then you hook onto the next stream. This is more Concat than Switch.

With Switch, you may get:

  1. Stream A starts
  2. Events from A stream out through your observable
  3. Stream B starts - your observable unsubscribes from A and switches to B

The problem is that if you get a new stream appearing before an old stream completes, then LastAsync on the old stream won't fire and you won't get your bridging event.

If you were using Concat, your observable won't switch to Stream B until Stream A is complete, and if you using the approach from Timothy Shields, you'll get your mutated last value at the end of A.

However, if the situation you have is that you do want to Switch as soon as a new stream arrives, but you want your bridging event at that time, then it's a little more tricky. It might have to be an adapted version of Timothy Shields' answer, something like:

IObservable<IObservable<T>> source = ...;
Func<T, T> mutation = ...;
IObservable<T> query = source
    .Select(innerSource => innerSource
        .Publish(o => {
               // end this stream when source pushes another value
               var stream = o.TakeUntil(source);
               return stream.Concat(stream.LastAsync().Select(mutation)));
               })
        .Concat();

There may be better/cleaner ways of doing this. With Rx, it's always worth trying something that seems complex a few different ways, because sometimes one way is much simpler than the others.

Upvotes: 2

Related Questions