Steve D
Steve D

Reputation: 393

switchMap combined with mergeMap

I have an Observable where each new value should cause an HTTP request. On the client-side I only care about the latest response value; however, I want every request to complete for monitoring/etc. purposes.

What I currently have is something like:

function simulate(x) {
  // Simulate an HTTP request.
  return of(x).pipe(delay(6));
}

source$.pipe(
  someMapFunc(x => simulate(x)),
);

When I use switchMap for the someMapFunc, I get the right set of responses (only the latest). However, if the request is taking too long, it will get canceled.

When I use mergeMap instead, I get the right set of requests (every request completes), but I get the wrong set of responses (every single one).

marble diagram of code above

Is there a way to get the requests of mergeMap with the responses of switchMap? I know I can write this as a custom operator, but I'm wondering if I can build this out of existing/standard rxjs operators. To summarize what I'm thinking of:

Edit: Based on the accepted answer, I was able to get the following, which works:

function orderedMergeMap(project) {
  return (s) => defer(() => {
    let recent = 0;
    return s.pipe(
      mergeMap((data, idx) => {
        recent = idx;
        return project(data).pipe(filter(() => idx === recent));
      })
    );
  });
}

Upvotes: 3

Views: 1432

Answers (2)

Mrk Sef
Mrk Sef

Reputation: 8022

I'm not 100% sure if this is what you're after, and I haven't fully tested this, but I created a custom operator that might do something close to what you're after. Maybe you can tinker with it a bit more.

This is a mergeMap that filters out "old" values. Old values are emissions from sources that happen after a newer source starts to emit.

function orderedMergeMap<T, R>(project: (v:T) => Observable<R>): OperatorFunction<T, R> {
  return s => defer(() => {
    let recent = 0;
    return s.pipe(
      map((v, i) => ({order: i, payload: v})),
      mergeMap(({order, payload}) => project(payload).pipe(
        map(v => ({order, payload: v}))
      )),
      tap(({order}) => {
        if(order > recent) recent = order;
      }),
      filter(({order}) => order < recent),
      map(({payload}) => payload)
    );
  });
}

The version OP settled on:

function orderedMergeMap<T, R>(project: (v:T) => Observable<R>): OperatorFunction<T, R> {
  return s => defer(() => { 
    let recent = 0; 
    return s.pipe( 
      mergeMap((data, idx) => { 
        recent = idx; 
        return project(data).pipe(
          filter(() => idx === recent)
        ); 
      }) 
    ); 
  }); 
}

Upvotes: 4

Tonnio
Tonnio

Reputation: 655

I believe that you need a combination of concatMap() and last().

concatMap does not subscribe to the next observable until the previous completes. Using it you will ensure the order of requests execution. And as it follows from the description it doesn't cancel previous subscriptions and let them finish, unlike switchMap.

last emits the last value emitted from the source on completion. Using it you will ensure that only one (last) result will be passed to the result.

Your code will look like that:

source$.pipe(
  concatMap(x => simulate(x)),
  last()
);

Upvotes: 2

Related Questions