Matthew Finlay
Matthew Finlay

Reputation: 3464

Neater way to switch between two IObservables based on a third

I have two value streams and one selector stream and I'd like to produce a result stream that alternates between the value streams based on the selector. The code below gives the right result, but I don't like it.

Does anyone have anything neater?

var valueStreamA = new BehaviorSubject<int>(0);
var valueStreamB = new BehaviorSubject<int>(100);
var selectorStream = new BehaviorSubject<bool>(true);

var filteredA = valueStreamA .CombineLatest(selectorStream, (a, c) => new { A = a, C = c })
  .Where(ac => ac.C)
  .Select(ac => ac.A);
var filteredB = valueStreamB.CombineLatest(selectorStream, (b, c) => new { B = b, C = c })
  .Where(bc => !bc.C)
  .Select(bc => bc.B);

var result = Observable.Merge(filteredA, filteredB);
result.Subscribe(Console.WriteLine);

valueStreamA.OnNext(1);
valueStreamB.OnNext(101);
selectorStream.OnNext(false);

valueStreamA.OnNext(2);
valueStreamB.OnNext(102);
selectorStream.OnNext(true);

This productes the following output:

0
1
101
102
2

Upvotes: 1

Views: 98

Answers (3)

H_Andr
H_Andr

Reputation: 179

Now much closer to your original question:

void Main()

{

var valueStreamA = new BehaviorSubject<int>(0);
var valueStreamB = new BehaviorSubject<int>(100);

var selectorStreamA = valueStreamA.Select(id => Tuple.Create("A", id)).Publish();
var selectorStreamB = valueStreamB.Select(id => Tuple.Create("B", id)).Publish();
var selectorStream = new BehaviorSubject<bool>(true);

var query = from selector in selectorStream
            select from merged in selectorStreamA.Merge(selectorStreamB)
                   where selector == true ? merged.Item1 == "A" : merged.Item1 == "B"
                   select merged.Item2;

query.Switch().Subscribe(Console.WriteLine);

selectorStreamA.Connect();
selectorStreamB.Connect();

//First we get 0 output (because we are already using stream A, and it has a first value)
valueStreamA.OnNext(1); //This is output, because our selector remains as 'A'
valueStreamB.OnNext(101); //This is ignored - because we don't take from B
selectorStream.OnNext(false); //Switch to B

valueStreamA.OnNext(2); //Ignored - we are now using B only
valueStreamB.OnNext(102); //This is output
selectorStream.OnNext(true); //Switch back to A.

}

Outputs:

0 1 102

Upvotes: 0

JerKimball
JerKimball

Reputation: 16944

I'd do something like this:

var a = new BehaviorSubject<int>(0);
var b = new BehaviorSubject<int>(100);
var c = new BehaviorSubject<bool>(true);

var valueStreamA = a as IObservable<int>;
var valueStreamB = b as IObservable<int>;
var selector = c as IObservable<bool>;

var result = selector
    // for every change in the selector...
    .DistinctUntilChanged()
    // select one of the two value streams
    .Select(change => change ? valueStreamA : valueStreamB)
    // and flatten the resulting wrapped observable
    .Switch();

result.Subscribe(Console.WriteLine);

a.OnNext(1);
b.OnNext(101);
c.OnNext(false);

a.OnNext(2);
b.OnNext(102);
c.OnNext(true);

Upvotes: 2

H_Andr
H_Andr

Reputation: 179

Could do something like:

var xs = Observable.Interval(TimeSpan.FromSeconds(1)).Select(_ => Feeds.Xs);
var ys = Observable.Interval(TimeSpan.FromSeconds(1)).Select(_ => Feeds.Ys);
var selectorSubject = new Subject<Feeds>();

var query = from selector in selectorSubject
                select from merged in xs.Merge(ys)
                where merged == selector
                select merged;

query.Switch().Subscribe(Console.WriteLine);

OnNext into your 'selectorSubject' to change it. There are a few differences to your example, but easy to get around:

  1. Your question involved a selector of type bool, whereas I have been lazy and reused the Feeds enum in order to allow me to do an easy equality check (where merged == selector). You of course could simply do (where selector ? merged == Xs : merged == Ys), or something like that to evaluate each merged item and discard ones you don't care about (depending on your selector).

Specifically, you would probably want to select not just the integer, but an identifier of the feed. Consider using something like Tuple.Create(), so you get that info with each update: {A - 1}, {B - 101} etc. Your where can then do: where selector ? merged.Item1 == A : merged.Item1 == B //this maps 'true' to feed A

  1. I also used a Switch, which will cause my sample streams to restart because they are not published. You probably want to publish yours and Connect them (make them 'hot'), so a Switch like mine doesn't cause any new side effects in the subscription. You have a subject (which is hot), but the 'behaviour' part will replace the value you passed into the constructor. Publishing and connecting would prevent that.

Shout if you are still confused. This isn't a full answer, but might give you enough to think about.

Howard.

Upvotes: 0

Related Questions