Hernan Rajchert
Hernan Rajchert

Reputation: 926

How to iterate a Cold Observable in a cached way

I wanted to implement the relative complement of two finite Observable sequences, and this is the best that I got so far:

function relativeComplement (setA, setB, cmp) {

    return setA.concatMap(objA =>
                        setB
                            .reduce((acc, cur) => {
                                if (cmp(objA, cur)) {
                                    return {val: objA, skip:true}
                                }
                                return acc
                            }, {val: objA, skip: false})
                            .filter(obj => !obj.skip)
                            .map(obj => obj.val)
                    )
}

The example works but has two issues that I haven't been able to overcome. First, I would like to use scan instead of reduce, because I know that if I already set skip to true, there is no point in continuing the sequence.

The second problem is the one that bothers me the most.

If setB is a cold observable, it will "construct it", or do any side effects it may have attached setA.length number of times.

Here it is a jsbin that shows the problem

So I have two questions.

NOTE: Im using RxJs 5 alpha and it doesn't have a replay method in the observable prototype

Upvotes: 0

Views: 123

Answers (1)

user3743222
user3743222

Reputation: 18665

If you go on the converting the observables in array idea, and supposing you have a function relativeComplementArray :

function relativeComplement (setA, setB, cmp) {
  return Rx.Observable.forkJoin(setA.toArray(), setB.toArray(), function (arrayA, arrayB){
     return relativeComplementArray(arrayA, arrayB, cmp);
  })
}

For a version with replay functionality, it is a bit more complicated to use it inside a function, because you need to work not on setB but on setB.shareReplay().

I propose you a curried function.

function relativeComplement ( setB, cmp ) {
  var sharedSetB = setB.shareReplay();
  return function ( setA ) {
    return Rx.Observable.forkJoin(setA.toArray(), sharedSetB.toArray(), function ( arrayA, arrayB ) {
      return relativeComplementArray(arrayA, arrayB, cmp);
    })
  }
}

This is all untested, but hopefully it gets you in the right direction.

Upvotes: 1

Related Questions