Reputation: 1490
RxJS provides the sequenceEqual operator to compare two streams in order. How would one go about testing equality of two streams regardless of order?
Pseudocode:
//how do we implement sequenceEqualUnordered?
from([1,2,3]).pipe(sequenceEqualUnordered(from([3,2,1]))).subscribe((eq) =>
console.log("Eq should be true because both observables contain the same values")
)
In my particular use case I need to wait until a certain set of values has been emitted or error but I don't care what order they're emitted in. I just care that each value of interest is emitted once.
Upvotes: 1
Views: 379
Reputation: 2210
Here's my solution:
import { Observable, OperatorFunction, Subscription } from 'rxjs';
export function sequenceEqualUnordered<T>(compareTo: Observable<T>, comparator?: (a: T, b: T) => number): OperatorFunction<T, boolean> {
return (source: Observable<T>) => new Observable<boolean>(observer => {
const sourceValues: T[] = [];
const destinationValues: T[] = [];
let sourceCompleted = false;
let destinationCompleted = false;
function onComplete() {
if (sourceCompleted && destinationCompleted) {
if (sourceValues.length !== destinationValues.length) {
emit(false);
return;
}
sourceValues.sort(comparator);
destinationValues.sort(comparator);
emit(JSON.stringify(sourceValues) === JSON.stringify(destinationValues));
}
}
function emit(value: boolean) {
observer.next(value);
observer.complete();
}
const subscriptions = new Subscription();
subscriptions.add(source.subscribe({
next: next => sourceValues.push(next),
error: error => observer.error(error),
complete: () => {
sourceCompleted = true;
onComplete();
}
}));
subscriptions.add(compareTo.subscribe({
next: next => destinationValues.push(next),
error: error => observer.error(error),
complete: () => {
destinationCompleted = true;
onComplete();
}
}));
return () => subscriptions.unsubscribe();
});
}
As many of RxJS operators have some input parameters and as all of them return functions, sequenceEqualUnordered
also has some input parameter (mostly the same as Rx's sequenceEqual
operator) and it returns a function. And this returned function has the Observable<T>
as the source
type, and has Observable<boolean>
as the return type.
Creating a new Observable that will emit boolean
values is exactly what you need. You'd basically want to collect all the values from both source
and compareTo
Observables (and store them to sourceValues
and destinationValues
arrays). To do this, you need to subscribe to both source
and compareTo
Observables. But, to be able to handle subscriptions, a subscriptions
object has to be created. When creating a new subscriptions to source
and compareTo
, just add
those subscriptions to subscriptions
object.
When subscribing to any of them, collect emitted values to appropriate sourceValues
or destinationValues
arrays in next
handlers. Should any errors happen, propagate them to the observer
in error
handlers. And in complete
handlers, set the appropriate sourceCompleted
or destinationCompleted
flags to indicate which Observable has completed.
Then, in onComplete
check if both of them have completed, and if they all are, compare the emitted values and emit appropriate boolean value. If sourceValues
and destinationValues
arrays don't have the same lengths, they can't equal the same, so emit false
. After that, basically sort the arrays and then compare the two.
When emitting, emit both the value and complete
notification.
Also, the return value of function passed to the new Observable<boolean>
should be the unsubscribe
function. Basically, when someone unsubscribes from new Observable<boolean>
, it should also unsubscribe from both source
and compareTo
Observables and this is done by calling () => subscriptions.unsubscribe()
. subscriptions.unsubscribe()
will unsubscribe from all subscriptions that are add
ed to it.
TBH, I haven't wrote any tests for this operator, so I'm not entirely sure that I have covered all edge cases.
Upvotes: 2
Reputation: 1899
My first idea. Use toArray
on both then zip
them together finally sort and compare results?
Upvotes: 0