Reputation: 926
I wanted to implement the relative complement of two finite Observable sequences, and this is the best that I got so far:
function relativeComplement (setA, setB, cmp) {
return setA.concatMap(objA =>
setB
.reduce((acc, cur) => {
if (cmp(objA, cur)) {
return {val: objA, skip:true}
}
return acc
}, {val: objA, skip: false})
.filter(obj => !obj.skip)
.map(obj => obj.val)
)
}
The example works but has two issues that I haven't been able to overcome. First, I would like to use scan instead of reduce, because I know that if I already set skip to true, there is no point in continuing the sequence.
The second problem is the one that bothers me the most.
If setB is a cold observable, it will "construct it", or do any side effects it may have attached setA.length number of times.
Here it is a jsbin that shows the problem
So I have two questions.
Do you see another way to implement the relative complement that overcomes this two problems?
Can I cache the results of setB so it doesn't replay the construction and side effects?
NOTE: Im using RxJs 5 alpha and it doesn't have a replay method in the observable prototype
Upvotes: 0
Views: 123
Reputation: 18665
If you go on the converting the observables in array idea, and supposing you have a function relativeComplementArray
:
function relativeComplement (setA, setB, cmp) {
return Rx.Observable.forkJoin(setA.toArray(), setB.toArray(), function (arrayA, arrayB){
return relativeComplementArray(arrayA, arrayB, cmp);
})
}
For a version with replay
functionality, it is a bit more complicated to use it inside a function, because you need to work not on setB
but on setB.shareReplay()
.
I propose you a curried function.
function relativeComplement ( setB, cmp ) {
var sharedSetB = setB.shareReplay();
return function ( setA ) {
return Rx.Observable.forkJoin(setA.toArray(), sharedSetB.toArray(), function ( arrayA, arrayB ) {
return relativeComplementArray(arrayA, arrayB, cmp);
})
}
}
This is all untested, but hopefully it gets you in the right direction.
Upvotes: 1