Reputation: 1012
I'm having trouble filtering an Observable that is based on a groupBy
observable.
Here is an example based on the groupBy()
documentation:
var codes = [
{ keyCode: 38}, // up
{ keyCode: 38}, // up
{ keyCode: 40}, // down
{ keyCode: 40}, // down
{ keyCode: 37}, // left
{ keyCode: 39}, // right
{ keyCode: 37}, // left
{ keyCode: 39}, // right
{ keyCode: 66}, // b
{ keyCode: 65} // a
];
var source = Rx.Observable.from(codes)
.groupBy(
function (x) { return x.keyCode; },
function (x) { return x.keyCode; }
)
.flatMap(obs =>
obs.count()
.filter(x => x > 1)
.flatMap(obs.toArray())
)
source.subscribe(console.log)
This returns:
[]
[]
[]
[]
So, the Observables are being filtered, but their values are gone.
Upvotes: 1
Views: 1117
Reputation: 1012
I figured out a solution similar to user3743222's answer
var filtered = source
.flatMap(obs => {
var accumulated = obs.scan((a,b) => a.concat(b), [])
return obs
.scan((a) => a + 1, 0)
.filter(i => i > 1)
.withLatestFrom(
accumulated,
(a, b) => Rx.Observable.from(b)
)
})
.flatMap(d => d)
.subscribe(d => console.log(d))
Upvotes: 0
Reputation: 18665
It is tricky to reason about observables as a single object (say, an array) instead of as a asynchronous sequence of values. What happens here is :
you take the obs
observable and then you apply count
to it
count
will wait for the observable to finish and will output one value, the number of elements and then finish.
you filter that value, but then you are applying toArray
to obs
which is already finished. That's why you get an empty array.
So the problem here is the same obs
variable is a different thing at a different time, so you cannot avoid having to reason about time, when you reuse the same variable at different points of your program.
Now, if you want to get the count number out to the subscribe
, you should be able to directly write obs.count().filter(x => x > 1).toArray()
instead of what you have right now. flatMap
accepts promise and arrays in addition to observables as the return value of the selector function :
cf. https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/selectmany.md
UPDATE: with the comments taken into account, this is what I came up with (to be tested) :
var source = Rx.Observable
.from(codes)
.scan(function ( acc, code ) {
acc[code] = acc[code] + 1;
acc.latest = code;
return acc;
}, {})
.filter(function ( acc ) {return acc[acc.latest] > 1;})
.flatMap(function ( acc ) {return acc[acc.latest] == 2? Rx.Observable.repeat(acc.latest, 2) : Rx.just(acc.latest);})
If you can wait to have all the values processed, you can stop the above after the filter and add .last()
. This will get you the last accumulator with the count of all values. From there you can do what you want.
Upvotes: 2