Reputation: 864
I'm working one a back-end app and I have a situation where I'm getting events from users. For each event I'm going to do a call to some endpoint foo
. If I have several events from the same user, I want to do the next foo
call after previous call to foo
is finished (concatMap behaviour). At the same time, for different users calls have to be done in parallel (mergeMap behaviour).
In the code it looks something like this:
const userEvent$: Observable<{userId: string}> = ...
const foo: (userId: string) => Observable<Response> = ...
let result$: Observable<Response> = ???
Upvotes: 0
Views: 90
Reputation: 6414
Consider utilizing the GroupBy operator, to group events by userId
, next apply concatMap
to the group, so that events within the same group are executed sequentially, as demonstrated in the example below:
userEvent$.pipe(
groupBy(item => item.userId),
mergeMap(group => group.pipe(concatMap(x => foo(x.userId))))
);
Demo:
const { of, from } = rxjs;
const { delay, tap, concatMap, mergeMap, groupBy } = rxjs.operators;
function foo(userId, index) {
return of(userId).pipe(
delay(2000),
tap(() => console.log("UserId: ", userId, ", Occurrences: ", index + 1))
)
}
const userEvent$ = from([
{ userId: 1 }, // <- first
{ userId: 2 }, // <- first
{ userId: 2 }, // <- second
{ userId: 3 },// <- first
{ userId: 1 }, // <- second
{ userId: 1 }, // <- third
{ userId: 2 }, // <- third
{ userId: 3 }, // <- second
]);
userEvent$
.pipe(
groupBy(x => x.userId),
mergeMap(group => group.pipe(concatMap((x,i) => foo(x.userId,i)))),
)
.subscribe()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.2/rxjs.umd.min.js"></script>
Upvotes: 2