basteln
basteln

Reputation: 2627

Rx Operator: Take from Observable whenever latest value from other Observable is true

I'm looking for an Rx operator with the following semantics:

Observable<int> inputValues = …;
Observable<bool> gate = …;

inputValues
    .combineLatest(gate)
    .filter((pair<int, bool> pair) -> { return pair.second; })
    .map((pair<int, bool> pair) -> { return pair.first; });

It emits values from the first Observable while the latest value from the second Observable is true. Using combineLatest, we also get a value when the second Observable becomes true. If we don't want that, we could use withLatestFrom in place of combineLatest.

Does this operator exist (in any Rx implementation)?

Upvotes: 0

Views: 95

Answers (1)

Enigmativity
Enigmativity

Reputation: 117029

My apologies for giving you C# code, but here's how this can be written. Hopefully someone can translate for me.

Use the .Switch operator. It turns an IObservable<IObservable<T>> into an IObservable<T> by only returning the values produced by the latest observable returned by the outer observable.

var _gate = new Subject<bool>();
var _inputValues = new Subject<int>();

IObservable<int> inputValues = _inputValues;
IObservable<bool> gate = _gate;

IObservable<int> query =
    gate
        .StartWith(true)
        .Select(g => g ? inputValues : Observable.Never<int>())
        .Switch();

query
    .Subscribe(v => Console.WriteLine(v));

_inputValues.OnNext(1);
_inputValues.OnNext(2);
_inputValues.OnNext(3);
_gate.OnNext(false);
_inputValues.OnNext(4);
_inputValues.OnNext(5);
_gate.OnNext(true);
_inputValues.OnNext(6);

This produces:

1
2
3
6

If inputValues is a ReplaySubject then you can do this to make the query work correctly:

IObservable<int> query =
    inputValues
        .Publish(i =>
            gate
                .StartWith(true)
                .Select(g => g ? i : Observable.Never<int>())
                .Switch());

Upvotes: 2

Related Questions