Julius Dzidzevičius
Julius Dzidzevičius

Reputation: 11000

RxJs - get duplicate items of two observables

I need to get duplicate items of two streams. I think I almost managed to do it, but only if those items that are duplicate of second stream, goes in order. For ex:

This works:

first = Observable.of(1, 2, 3)
second = Observable.of(2, 3, 1) 

But this doesn't:

first = Observable.of(1, 4, 3)
second = Observable.of(1, 2, 3)

When my loop gets to the 4, it breaks:

EmptyError {name: "EmptyError", stack: "EmptyError: no elements in sequence↵ at new Emp…e (http://localhost:4200/vendor.bundle.js:161:22)", message: "no elements in sequence"}

Whole my code is in one function, you can copy/paste and test it:

findDublicates() {

  let match = 0; // setting it to 0, so later could assign other number
  let keys = []; // list of maching keys 
  let elementAt = 0; // index of item of first observable          

  let allKeys$;
  let validKeys$;

  // counting the length of both observables, so this will be the number of loops
  // that checks for dublicates
  let allKeysLength;
  let validKeysLength;
  let allKeysLength$ = Observable.of(2, 1, 4, 5, 7).count()
    allKeysLength$.subscribe(val => allKeysLength = val)
  let validKeysLength$ = Observable.of(1, 2, 3, 8, 5).count()
    validKeysLength$.subscribe(val => validKeysLength = val)   

  let cycles = Math.min(allKeysLength,validKeysLength); // length of the shorter observable               

  // wrapping it in a function so when called variables will take new values
  function defineObs() {

    allKeys$ = Observable.of(2, 1, 4, 5, 7)
      .elementAt(elementAt).take(1);

    validKeys$ = Observable.of(1, 2, 3, 8, 5)
      .filter((x) => (x === match)).first(); 
   }

  for (var i=0; i<=cycles; i++) {

    defineObs();

    allKeys$.subscribe(
      function (val) { match = val },
      function (err) { console.log(err) },
      function () { console.log('Done filter')}
    );
    validKeys$.subscribe(
      function (val) { keys.push(val) },
      function (err) { console.log(err) },
      function () { console.log('Done push')}
    );

    elementAt += 1;
    cycles -= 1;

  } 

  return console.log(keys);

}

Thanks for any help.

Upvotes: 1

Views: 2206

Answers (2)

concat
concat

Reputation: 3187

If you don't care about which stream emits the first value of a set of duplicates, you may just merge them and treat as finding duplicate values on a single stream:

first.merge(second)
     .scan(([ dupes, uniques ], next) =>
       [ uniques.has(next) ? dupes.add(next) : dupes, uniques.add(next) ], 
       [ new Set(), new Set() ]
     )
     .map(([ dupes ]) => dupes)

Note: the Sets above are immutable, to avoid undefined behavior in scan.

Upvotes: 3

Everest
Everest

Reputation: 607

I would check up on Observable.combineLatest and the scan method on an observable sequence.

Here’s what I’m thinking, combine the two observables using combineLatest and apply the scan operator on that. You can even use a Set to ensure uniqueness or even a map and filter.

Upvotes: 0

Related Questions