Reputation: 179109
I have two observables:
I'd like to filter the second observable using values from the first one.
The values received from the server include a tag
property, which corresponds to values in the checkbox list. The observable resulted from the combination of the above two would only yield values from the server whose tag
property is included in the set of ticked checkboxes.
Upvotes: 39
Views: 27059
Reputation: 111
Expanding on the answer from @Dorus... In Kotlin, you can do it like so:
val observable: Observable<Data> = ...
val filter: Observable<Checkbox> = ...
val filtered: Observable<Data> =
observable.filterWithLatestFrom(filter) { checkbox -> checkbox.isSelected }
Using the extension function:
/**
* Return an [Observable] with type [T1] that is filtered using the last event emitted by the [other] observable.
*/
fun <T1 : Any, T2 : Any> Observable<T1>.filterWithLatestFrom(other: Observable<T2>, filterFunction: (T2) -> Boolean)
: Observable<T1> {
return this.withLatestFrom(other) { obs1, obs2 -> Pair(obs1, obs2) }
.filter { (_, obs2) -> filterFunction.invoke(obs2) }
.map { (obs1, _) -> obs1}
}
Upvotes: 0
Reputation: 7546
You can use withLatestFrom
.
.
source.withLatestFrom(checkboxes, (data, checkbox) => ({data, checkbox}))
.filter(({data, checkbox}) => ...)
Here, checkboxes
is an observable representing a list of checkbox inputs. source
is an observable representing a stream of events coming from the server. In the filter function you can check if the data is valid compared to the checkbox settings and let it trough.
Notice it is important checkboxes
emits at least 1 value before the stream can emit anything.
Ps. In regard to other answers, this solution works even if the source is cold.
Upvotes: 45
Reputation: 179109
Apparently, what I needed was a combination of select
, filter
and switchLatest
. I've written a small test case demonstrating this: https://gist.github.com/igstan/d5b8db7b43f49dd87382#file-observable-filter-observable-js-L36-L45
Upvotes: 2
Reputation: 73798
In order to filter stream A using values of stream B, you need to observe stream B and use the latest values to filter stream A.
Use switch()
to transform B observable to an observable producing values from A observable.
checkedInputValuesSource
.map(function (options) {
return dataSource
.filter(function (value) {
return options.indexOf(value) !== -1;
});
})
.switch()
.subscribe(function (x) {
console.log('out: ' + x);
});
Using switch()
assumes that dataSource
is a hot observable.
Example using interval()
to produce dummy data:
var input,
checkedInputValuesSource,
dataSource;
input = document.querySelectorAll('input');
// Generate source describing the current filter.
checkedInputValuesSource = Rx.Observable
.fromEvent(input, 'change')
.map(function () {
var inputs = document.querySelectorAll('input'),
checkedInputValues = [];
[].forEach.call(inputs, function (e) {
if (e.checked) {
checkedInputValues.push(e.value);
}
});
return checkedInputValues;
})
.startWith([]);
// Generate random data source (hot).
dataSource = Rx.Observable
.interval(500)
.map(function () {
var options = ['a', 'b', 'c'];
return options[Math.floor(Math.floor(Math.random() * options.length))];
})
.do(function (x) {
console.log('in: ' + x);
})
.share();
checkedInputValuesSource
.map(function (options) {
return dataSource
.filter(function (value) {
return options.indexOf(value) !== -1;
});
})
.switch()
.subscribe(function (x) {
console.log('out: ' + x);
});
<script src='https://rawgit.com/Reactive-Extensions/RxJS/v.2.5.3/dist/rx.all.js'></script>
<input type='checkbox' value='a'>
<input type='checkbox' value='b'>
<input type='checkbox' value='c'>
This example will produce output similar to:
in: c
in: a
out: a
in: b
in: c
out: a
in: b
in: a
Where in
reflects all generated input and b
the data that passes the filter. Filter is adjusted by checking the checkbox inputs, that reflect values "a", "b" and "c".
Upvotes: 6