Reputation: 11000
I need to get duplicate items of two streams. I think I almost managed to do it, but only if those items that are duplicate of second stream, goes in order. For ex:
This works:
first = Observable.of(1, 2, 3)
second = Observable.of(2, 3, 1)
But this doesn't:
first = Observable.of(1, 4, 3)
second = Observable.of(1, 2, 3)
When my loop gets to the 4, it breaks:
EmptyError {name: "EmptyError", stack: "EmptyError: no elements in sequence↵ at new Emp…e (http://localhost:4200/vendor.bundle.js:161:22)", message: "no elements in sequence"}
Whole my code is in one function, you can copy/paste and test it:
findDublicates() {
let match = 0; // setting it to 0, so later could assign other number
let keys = []; // list of maching keys
let elementAt = 0; // index of item of first observable
let allKeys$;
let validKeys$;
// counting the length of both observables, so this will be the number of loops
// that checks for dublicates
let allKeysLength;
let validKeysLength;
let allKeysLength$ = Observable.of(2, 1, 4, 5, 7).count()
allKeysLength$.subscribe(val => allKeysLength = val)
let validKeysLength$ = Observable.of(1, 2, 3, 8, 5).count()
validKeysLength$.subscribe(val => validKeysLength = val)
let cycles = Math.min(allKeysLength,validKeysLength); // length of the shorter observable
// wrapping it in a function so when called variables will take new values
function defineObs() {
allKeys$ = Observable.of(2, 1, 4, 5, 7)
.elementAt(elementAt).take(1);
validKeys$ = Observable.of(1, 2, 3, 8, 5)
.filter((x) => (x === match)).first();
}
for (var i=0; i<=cycles; i++) {
defineObs();
allKeys$.subscribe(
function (val) { match = val },
function (err) { console.log(err) },
function () { console.log('Done filter')}
);
validKeys$.subscribe(
function (val) { keys.push(val) },
function (err) { console.log(err) },
function () { console.log('Done push')}
);
elementAt += 1;
cycles -= 1;
}
return console.log(keys);
}
Thanks for any help.
Upvotes: 1
Views: 2206
Reputation: 3187
If you don't care about which stream emits the first value of a set of duplicates, you may just merge them and treat as finding duplicate values on a single stream:
first.merge(second)
.scan(([ dupes, uniques ], next) =>
[ uniques.has(next) ? dupes.add(next) : dupes, uniques.add(next) ],
[ new Set(), new Set() ]
)
.map(([ dupes ]) => dupes)
Note: the Sets above are immutable, to avoid undefined behavior in scan
.
Upvotes: 3
Reputation: 607
I would check up on Observable.combineLatest
and the scan
method on an observable sequence.
Here’s what I’m thinking, combine the two observables using combineLatest
and apply the scan
operator on that. You can even use a Set
to ensure uniqueness or even a map
and filter
.
Upvotes: 0