Reputation: 2627
I'm looking for an Rx operator with the following semantics:
Observable<int> inputValues = …;
Observable<bool> gate = …;
inputValues
.combineLatest(gate)
.filter((pair<int, bool> pair) -> { return pair.second; })
.map((pair<int, bool> pair) -> { return pair.first; });
It emits values from the first Observable while the latest value from the second Observable is true
. Using combineLatest
, we also get a value when the second Observable becomes true
. If we don't want that, we could use withLatestFrom
in place of combineLatest
.
Does this operator exist (in any Rx implementation)?
Upvotes: 0
Views: 95
Reputation: 117029
My apologies for giving you C# code, but here's how this can be written. Hopefully someone can translate for me.
Use the .Switch
operator. It turns an IObservable<IObservable<T>>
into an IObservable<T>
by only returning the values produced by the latest observable returned by the outer observable.
var _gate = new Subject<bool>();
var _inputValues = new Subject<int>();
IObservable<int> inputValues = _inputValues;
IObservable<bool> gate = _gate;
IObservable<int> query =
gate
.StartWith(true)
.Select(g => g ? inputValues : Observable.Never<int>())
.Switch();
query
.Subscribe(v => Console.WriteLine(v));
_inputValues.OnNext(1);
_inputValues.OnNext(2);
_inputValues.OnNext(3);
_gate.OnNext(false);
_inputValues.OnNext(4);
_inputValues.OnNext(5);
_gate.OnNext(true);
_inputValues.OnNext(6);
This produces:
1 2 3 6
If inputValues
is a ReplaySubject
then you can do this to make the query work correctly:
IObservable<int> query =
inputValues
.Publish(i =>
gate
.StartWith(true)
.Select(g => g ? i : Observable.Never<int>())
.Switch());
Upvotes: 2