Patrick
Patrick

Reputation: 1291

How to achieve an RxJS operator like concatMap() with a queue size of 1?

concatMap() will exhaust input observables in order. It will move on to the next input observable if the current one completes. Input observables will be queued and processed one by one. This is almost what I want.

I want a slightly different behavior: I want an operator that does the same as concatMap(), but with a queue length of 1. That is, an operator that will exhaust the current observable, but while doing that, keep only the most recent new input observable, instead of queueing them all. How can I achieve this?

Edit: switchMap() is not a solution, because I don't want to cancel the current observable when the next one arrives. I want to exhaust it and then move on to the most recent next one.

Edit 2: exhaustMap() is not a solution, because it will ignore subsequent observables as long as one is still being consumed.

Upvotes: 0

Views: 250

Answers (1)

Steve
Steve

Reputation: 602

Here you go. Create a custom operator based on mergeMap. Cache the latest value and once active observable completes throws the latest observable into the stream and repeat.

Full example here.

function concatLatest<T, O extends Observable<unknown>>(
  project: (value: T, index: number) => O
): OperatorFunction<T, ObservedValueOf<O>> {
  let subj: BehaviorSubject<O> | null = null;
  let latestObservable: O = null;

  return mergeMap((val, i) => {
    latestObservable = defer(() =>
      project(val, i).pipe(
        tap({
          complete: () => {
            if (latestObservable) {
              subj.next(latestObservable);
              latestObservable = null;
            } else {
              subj.complete();
              subj = null;
            }
          },
        })
      )
    ) as O;

    if (subj) {
      return EMPTY;
    }

    subj = new BehaviorSubject(latestObservable);

    return subj.pipe(mergeAll());
  });
}

Upvotes: 2

Related Questions