imagio
imagio

Reputation: 1490

RxJS test equality of two streams regardless of order

RxJS provides the sequenceEqual operator to compare two streams in order. How would one go about testing equality of two streams regardless of order?

Pseudocode:

//how do we implement sequenceEqualUnordered?
from([1,2,3]).pipe(sequenceEqualUnordered(from([3,2,1]))).subscribe((eq) => 
  console.log("Eq should be true because both observables contain the same values")
)

In my particular use case I need to wait until a certain set of values has been emitted or error but I don't care what order they're emitted in. I just care that each value of interest is emitted once.

Upvotes: 1

Views: 379

Answers (2)

Mladen
Mladen

Reputation: 2210

Here's my solution:

import { Observable, OperatorFunction, Subscription } from 'rxjs';

export function sequenceEqualUnordered<T>(compareTo: Observable<T>, comparator?: (a: T, b: T) => number): OperatorFunction<T, boolean> {
  return (source: Observable<T>) => new Observable<boolean>(observer => {
    const sourceValues: T[] = [];
    const destinationValues: T[] = [];
    let sourceCompleted = false;
    let destinationCompleted = false;

    function onComplete() {
      if (sourceCompleted && destinationCompleted) {
        if (sourceValues.length !== destinationValues.length) {
          emit(false);
          return;
        }

        sourceValues.sort(comparator);
        destinationValues.sort(comparator);

        emit(JSON.stringify(sourceValues) === JSON.stringify(destinationValues));
      }
    }

    function emit(value: boolean) {
      observer.next(value);
      observer.complete();
    }

    const subscriptions = new Subscription();

    subscriptions.add(source.subscribe({
      next: next => sourceValues.push(next),
      error: error => observer.error(error),
      complete: () => {
        sourceCompleted = true;
        onComplete();
      }
    }));
    subscriptions.add(compareTo.subscribe({
      next: next => destinationValues.push(next),
      error: error => observer.error(error),
      complete: () => {
        destinationCompleted = true;
        onComplete();
      }
    }));

    return () => subscriptions.unsubscribe();
  });
}

As many of RxJS operators have some input parameters and as all of them return functions, sequenceEqualUnordered also has some input parameter (mostly the same as Rx's sequenceEqual operator) and it returns a function. And this returned function has the Observable<T> as the source type, and has Observable<boolean> as the return type.

Creating a new Observable that will emit boolean values is exactly what you need. You'd basically want to collect all the values from both source and compareTo Observables (and store them to sourceValues and destinationValues arrays). To do this, you need to subscribe to both source and compareTo Observables. But, to be able to handle subscriptions, a subscriptions object has to be created. When creating a new subscriptions to source and compareTo, just add those subscriptions to subscriptions object.

When subscribing to any of them, collect emitted values to appropriate sourceValues or destinationValues arrays in next handlers. Should any errors happen, propagate them to the observer in error handlers. And in complete handlers, set the appropriate sourceCompleted or destinationCompleted flags to indicate which Observable has completed.

Then, in onComplete check if both of them have completed, and if they all are, compare the emitted values and emit appropriate boolean value. If sourceValues and destinationValues arrays don't have the same lengths, they can't equal the same, so emit false. After that, basically sort the arrays and then compare the two.

When emitting, emit both the value and complete notification.

Also, the return value of function passed to the new Observable<boolean> should be the unsubscribe function. Basically, when someone unsubscribes from new Observable<boolean>, it should also unsubscribe from both source and compareTo Observables and this is done by calling () => subscriptions.unsubscribe(). subscriptions.unsubscribe() will unsubscribe from all subscriptions that are added to it.

TBH, I haven't wrote any tests for this operator, so I'm not entirely sure that I have covered all edge cases.

Upvotes: 2

Konrad Albrecht
Konrad Albrecht

Reputation: 1899

My first idea. Use toArray on both then zip them together finally sort and compare results?

Upvotes: 0

Related Questions