Max Smirnov
Max Smirnov

Reputation: 603

combineLatest with variable count of observables

I want combineLatest functional but for variable count of observables.

Something like:

// init combineLatest of three observables
[1, 2, 3]
// first observable produced new value "2"
[2, 2, 3]
// third observable ended
[2, 2]
// first observable produced new value "1"
[1, 2]
// new observable added
[2, 2, 4]

Is it possible in RxJS?

Upvotes: 2

Views: 1455

Answers (2)

Mrk Sef
Mrk Sef

Reputation: 8052

Buffer and combine based on a key

Here's a slight variant of what you're asking for. It works just like mergeAll, only it keeps a buffer and emits the latest for any observable that have emitted so far.

The varient here is that you need to supply string keys for your values to get attached to. You should be able to see how to turn this into array indices if you so choose.

The reason I haven't done this with an array is because there's no much undefined behavior. For example, if the first observable completes and the second observable emits, your elements are all opaquely re-ordered.

Using keys returns control back to the caller, who can just use Object.keys() if they don't care about indices/labels for their data.

Here you are:


interface LabeledObservable<T> {
  label: string,
  stream: Observable<T>
}

interface CombinedLatest<T> {
  [key:string]: T
}

function combineLatestAll<T>(): 
  OperatorFunction<
    LabeledObservable<T>, 
    CombinedLatest<T>
  > 
{
  return source$ => defer(() => {

    const buffer = {};

    return source$.pipe(
      mergeMap(({label, stream}) => stream.pipe(
        map(v => {
          buffer[label] = v;
          return {...buffer};
        }),
        finalize(() => {
          delete buffer[label];
        })
      ))
    );

  });
}

Subject for new observables

If you like the idea of a subject you can use to inject new observables into your combineLatest operator, this still allows that. The only alteration needed is that you must supply unique labels for your observables. If you don't care about the labels, you can just use any ID generator pattern (Like incrementing a global id counter or something).

const startingObservables: Observable<any>[] = /*some observables */;
const add = new Subject<LabeledObservable<any>>();

add.pipe(
  combineLatestAll()
).subscribe(console.log);

startingObservables.forEach((stream,i) => {
  add.next({label: "" + i, stream});
});

Upvotes: 1

Picci
Picci

Reputation: 17762

If I understand the problem right, the solution is pretty tricky for something that looks innocent.

I try to go step by step to explain a potential solution.

First of all we need understand that there are 3 different events that we need to manage:

  1. the fact that one Observable completes
  2. the fact that one Observable is added to the array which is given to combineLatest
  3. the fact that a new array of Observables has to be passed to combineLatest, either because we are at the beginning of the processing (i.e. with the initial array) or because we have added a new Observable or because one Observable has completed

The second thing that we need to recognize is that we need to store the array of Observables we pass to combineLatest in a variable, otherwise we are not able to add or remove Obaservables from there.

Once these things are clear, we can build a solution in the form of a function that returns 2 things:

  1. the Observable that we want to subscribe to and that should have the behavior that we are looking for
  2. a Subject that we can use to communicate that we want to add a new Observable to the combineLatest function

The last point we need to recognize is that any time we change the list of Observable, either because we add or because we remove an Observable (because it completed), we need to run the combineLatest function with the new fresh list of Observables.

Now that all this has been clarified, this is the code of the function that returns an Observable which behaves as described

function dynamicCombineLatest(startingObservables: Observable<any>[]) {
  // this is the variable holding the array of Observables
  let observables = startingObservables;

  // this is the array that contains the list of Observables which have been, potentially, transformed to emit
  // immediately the last value emitted - this happens when a new Observable is added to the array
  let observablesPotentiallyWithLastValueImmediatelyEmitted =
    startingObservables;

  // this is the variable holding the array of values last notified by each Observable
  // we will use it when we need to add a new Observable to the list
  const lastValues = [];

  // this are the Subjects used to notify the 3 different types of events
  const start = new BehaviorSubject<Observable<any>[]>(observables);
  const add = new Subject<Observable<any>>();
  const remove = new Subject<Observable<any>>();

  let skipFirst = false;

  // this is the chain of operations which must happen when a new Observable is added
  const addToObservables = add.pipe(
    tap({
      next: (obs) => {
        console.log("add");
        // we need to make sure that the Observables in the list will immediately start to emit
        // the last value they emitted. In this way we are sure that, as soon as the new added Observable emits somthing,
        // the last value emitted by the previous Observables will be considered
        observablesPotentiallyWithLastValueImmediatelyEmitted = observables.map(
          (o, i) => {
            return startWith(lastValues[i])(o);
          }
        );
        // the new Observable is added to the list
        observables.push(obs);
        observablesPotentiallyWithLastValueImmediatelyEmitted.push(obs);
      },
    })
  );
  // this is the chain of operations which must happen when an Observable is removed
  const removeFromObservables = remove.pipe(
    tap({
      next: (obs) => {
        const index =
          observablesPotentiallyWithLastValueImmediatelyEmitted.indexOf(obs);
        console.log("remove");
        // we simply remove the Observable from the list and it "last value"
        observablesPotentiallyWithLastValueImmediatelyEmitted.splice(index, 1);
        observables.splice(index, 1);
        lastValues.splice(index, 1);

        // we make sure that the Observables in the list will immediately start to emit with the last value they emitted
        observablesPotentiallyWithLastValueImmediatelyEmitted = observables.map(
          (o, i) => {
            return lastValues[i] ? startWith(lastValues[i])(o) : o;
          }
        );
        // we set that the first value of the new combineLatest Observable will be skipped
        skipFirst = true;
      },
    })
  );

  // here we merge the 2 chains of operations so that both add and remove logic will be executed
  // when the relative Subjects emit
  merge(addToObservables, removeFromObservables).subscribe({
    next: () => {
      console.log("new start");
      // we notify that a change in the Observable list has occurred and therefore we need to unsubscribe the previous "combineLatest"
      // and subscribe to the new one we are going to build
      start.next(observablesPotentiallyWithLastValueImmediatelyEmitted);
    },
  });

  // this is where we switch to a new Observable, result of the "combineLatest" operation,
  // any time the start Subject emits a new Observable list
  const dynamicObservables = start.pipe(
    switchMap((_observables) => {
      const _observablesSavingLastValueAndSignallingRemove = _observables.map(
        (o, i) =>
          o.pipe(
            tap({
              next: (v) => {
                // here we save the last value emitted by each Observable
                lastValues[i] = v;
              },
              complete: () => {
                // here we notify that the Observable has completed and we need to remove it from the list
                remove.next(o);
              },
            })
          )
      );
      console.log("add or remove");
      // eventually this is the Observable created by combineLatest with the expected array of Observables
      const _combineLatest = combineLatest(
        _observablesSavingLastValueAndSignallingRemove
      );
      const ret = skipFirst ? _combineLatest.pipe(skip(1)) : _combineLatest;
      skipFirst = false;
      return ret;
    })
  );

  // here we return the Observable which will be subscribed to and the add Subject to be used to add new Observables
  return { dynamicObservables, add };
}

You can look at this stackblitz for an example.

Upvotes: 2

Related Questions