dps
dps

Reputation: 864

Hybrid of mergeMap and concatMap in rxjs

I'm working one a back-end app and I have a situation where I'm getting events from users. For each event I'm going to do a call to some endpoint foo. If I have several events from the same user, I want to do the next foo call after previous call to foo is finished (concatMap behaviour). At the same time, for different users calls have to be done in parallel (mergeMap behaviour).

In the code it looks something like this:

const userEvent$: Observable<{userId: string}> = ...
const foo: (userId: string) => Observable<Response> = ...

let result$: Observable<Response> = ???

Upvotes: 0

Views: 90

Answers (1)

Rafi Henig
Rafi Henig

Reputation: 6414

Consider utilizing the GroupBy operator, to group events by userId, next apply concatMap to the group, so that events within the same group are executed sequentially, as demonstrated in the example below:

 userEvent$.pipe(
           groupBy(item => item.userId),
           mergeMap(group => group.pipe(concatMap(x => foo(x.userId))))
  );

Demo:

const { of, from } = rxjs;
const { delay, tap, concatMap, mergeMap, groupBy } = rxjs.operators;

function foo(userId, index) {
  return of(userId).pipe(
    delay(2000),
    tap(() => console.log("UserId: ", userId, ", Occurrences: ", index + 1))
  )
}


const userEvent$ = from([
  { userId: 1 }, // <- first 
  { userId: 2 }, // <- first 
  { userId: 2 }, // <- second 
  { userId: 3 },// <- first  
  { userId: 1 }, // <- second 
  { userId: 1 }, // <- third 
  { userId: 2 }, // <- third 
  { userId: 3 }, // <- second            
]);

userEvent$
  .pipe(
    groupBy(x => x.userId),
    mergeMap(group => group.pipe(concatMap((x,i) => foo(x.userId,i)))),
  )
  .subscribe()
   
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.2/rxjs.umd.min.js"></script>

Upvotes: 2

Related Questions