Reputation: 219938
Is there any way to consume the result of a mergeMap
in the original order of the outer observable, while still allowing the inner observables to run in parallel?
Let's look at two merge-mapping operators:
...which takes a mapping callback, and a number of how many inner observables may run concurrently:
of(1, 2, 3, 4, 5, 6).pipe(
mergeMap(number => api.get('/double', { number }), 3)
);
See it here in action: https://codepen.io/JosephSilber/pen/YzwVYNb?editors=1010
This will fire off 3 parallel requests for 1
, 2
and 3
, respectively. As soon as one of those requests completes, it'll fire off another request for 4
. And so on and so forth, always maintaining 3 concurrent requests, until all of the values have been processed.
However, since previous requests may complete before subsequent requests, the values produced may be out of order. So instead of:
[2, 4, 6, 8, 10, 12]
...we may actually get:
[4, 2, 8, 10, 6, 12] // or any other permutation
...enter concatMap
. This operator ensures that the observables are all concatenated in the original order, so that:
of(1, 2, 3, 4, 5, 6).pipe(
concatMap(number => api.get('/double', { number }))
);
...will always produce:
[2, 4, 6, 8, 10, 12]
See it here in action: https://codepen.io/JosephSilber/pen/OJMmzpy?editors=1010
This is what we want, but now the requests won't run in parallel. As the documentation says:
concatMap
is equivalent tomergeMap
withconcurrency
parameter set to1
.
So back to the question: is it possible to get the benefits of mergeMap
, whereby a given amount of requests can be run in parallel, while still having the mapped values be emitted in the original order?
The above described the problem in abstract. It is sometimes easier to reason about a problem when you know the actual problem at hand, so here goes:
I have a list of orders that have to be shipped:
const orderNumbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
I have a shipOrder
method that actually ships the orders. It returns a Promise
:
const shipOrder = orderNumber => api.shipOrder(orderNumber);
The API can only process up to 5 order shipments simultaneously, so I'm using mergeMap
to handle that:
from(orderNumbers).pipe(
mergeMap(orderNumber => shipOrder(orderNumber), 5)
);
After an order is shipped, we need to print its shipping label. I have a printShippingLabel
function that, given the order number of a shipped order, will print its shipping label. So I subscribe to our observable, and print the shipping labels as the values come in:
from(orderNumbers)
.pipe(mergeMap(orderNumber => shipOrder(orderNumber), 5))
.pipe(orderNumber => printShippingLabel(orderNumber));
This works, but now the shipping labels are printed out of order, since mergeMap
emits values based on when shipOrder
completes its request. What I want is for the labels to print in the same order as the original list.
Is that possible?
See here for a visualization of the problem: https://codepen.io/JosephSilber/pen/YzwVYZb?editors=1010
You can see that earlier orders are being printed before later orders are even shipped.
Upvotes: 9
Views: 4678
Reputation: 180
Take a look at this solution parallel calls with sequential loading using mergeMap
and scan
operatos
posts$: Subject<Post[]> = new Subject();
fetchPostsByIds() {
const postsIds = [1,2,3,4,5];
let sequenceNumber = 0;
const posts:any = [];
from(postsIds).pipe(
mergeMap((postId: number, index: number) =>
this.http.get(`${this.host}/posts/${postId}`).pipe(
map(post => {
return { post, index }
})
),
scan((acc: Post[], post: any) => [ ...acc, post], [])
).subscribe((result:any[]) => {
let sequentialPost = result.find((post:any) => post.index === sequenceNumber);
while(sequentialPost) {
sequenceNumber++;
posts.push(sequentialPost.post)
console.log("result =", posts)
this.posts$.next(posts);
sequentialPost = result.find((post:any) => post.index === sequenceNumber);
}
})
This will log the result in sequntial order
Upvotes: 0
Reputation: 21638
Use a scan to buffer up the results and emit them in order when they are available.
const { delay, map, mergeMap, of, scan, switchMap } = rxjs;
const api = {
get: (url, data) => of(data.number * 2).pipe(delay(Math.random() * 500))
};
of(1, 2, 3, 4, 5, 6).pipe(
mergeMap((number, index) => api.get('/double', { number }).pipe(
map(response => ({ response, index })) // We need to keep the index for ordering
), 3),
scan(
(acc, response) => {
acc.emit = [];
let index = acc.responses.findIndex(r => r.index > response.index);
if (index === -1) {
index = acc.responses.length;
}
acc.responses.splice(index, 0, response);
while (acc.current === acc.responses[0]?.index) {
acc.emit.push(acc.responses.shift());
acc.current++;
}
return acc;
},
{ current: 0, emit: [], responses: [] }
),
switchMap(acc => of(...acc.emit.map(r => r.response)))
).subscribe(response => {
console.log(response);
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js" integrity="sha512-v0/YVjBcbjLN6scjmmJN+h86koeB7JhY4/2YeyA5l+rTdtKLv0VbDBNJ32rxJpsaW1QGMd1Z16lsLOSGI38Rbg==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>
Upvotes: 0
Reputation: 11
replaced concatMap((v) => of(v).pipe(delayWhen(() => pipeNotifier))),
with zipWith
zipWith(subject),
map(...do async stuff...),
concatAll()
request 1
request 2
request 3
response 3
request 4
response 1
request 5
1
response 4
request 6
response 2
request 7
2
3
4
response 6
request 8
response 5
request 9
5
6
response 7
request 10
7
response 9
response 10
response 8
8
9
10
https://stackblitz.com/edit/js-fpds79
import { range, Subject, from, zipWith } from 'rxjs';
import { share, map, concatAll } from 'rxjs/operators';
const pipeNotifier = new Subject();
range(1, 10)
.pipe(
// 1. Make Observable controlled by pipeNotifier
zipWith(pipeNotifier),
// 2. Submit the request
map(([v]) =>
from(
(async () => {
console.log('request', v);
await wait();
console.log('response', v);
pipeNotifier.next();
return v;
})()
)
),
// 3. Keep order
concatAll()
)
.subscribe((x) => console.log(x));
// pipeNotifier controler
range(0, 3).subscribe(() => {
pipeNotifier.next();
});
function wait() {
return new Promise((resolve) => {
const random = 5000 * Math.random();
setTimeout(() => resolve(random), random);
});
}
Upvotes: 1
Reputation: 13071
You could use this operator: sortedMergeMap
, example.
const DONE = Symbol("DONE");
const DONE$ = of(DONE);
const sortedMergeMap = <I, O>(
mapper: (i: I) => ObservableInput<O>,
concurrent = 1
) => (source$: Observable<I>) =>
source$.pipe(
mergeMap(
(value, idx) =>
concat(mapper(value), DONE$).pipe(map(x => [x, idx] as const)),
concurrent
),
scan(
(acc, [value, idx]) => {
if (idx === acc.currentIdx) {
if (value === DONE) {
let currentIdx = idx;
const valuesToEmit = [];
do {
currentIdx++;
const nextValues = acc.buffer.get(currentIdx);
if (!nextValues) {
break;
}
valuesToEmit.push(...nextValues);
acc.buffer.delete(currentIdx);
} while (valuesToEmit[valuesToEmit.length - 1] === DONE);
return {
...acc,
currentIdx,
valuesToEmit: valuesToEmit.filter(x => x !== DONE) as O[]
};
} else {
return {
...acc,
valuesToEmit: [value]
};
}
} else {
if (!acc.buffer.has(idx)) {
acc.buffer.set(idx, []);
}
acc.buffer.get(idx)!.push(value);
if (acc.valuesToEmit.length > 0) {
acc.valuesToEmit = [];
}
return acc;
}
},
{
currentIdx: 0,
valuesToEmit: [] as O[],
buffer: new Map<number, (O | typeof DONE)[]>([[0, []]])
}
),
mergeMap(scannedValues => scannedValues.valuesToEmit)
);
Upvotes: 1
Reputation: 89
What you want is this:
from(orderNumbers)
.pipe(map(shipOrder), concatAll())
.subscribe(printShippingLabel)
Explanation:
The first operator in the pipe is map. It calls shipOrder for each value immediately (so subsequent values may start parallel requests).
The second operator, concatAll, puts the resolved values in proper sequence.
(I simplified the code; concatAll() is equivalent to concatMap(identity).)
Upvotes: 0
Reputation: 219938
I did manage to partially solve it, so I'm posting it here as an answer to my own question.
I still very much want to know the canonical way to handle this situation.
Create a custom operator that takes values that have an index key ({ index: number }
in Typescript parlance), and keeps a buffer of the values, only emitting them according to their index
's order.
Map the original list into a list of objects with their index
embedded.
Pass that onto our custom sortByIndex
operator.
Map the values back into their original values.
Here's what that sortByIndex
would look like:
function sortByIndex() {
return observable => {
return Observable.create(subscriber => {
const buffer = new Map();
let current = 0;
return observable.subscribe({
next: value => {
if (current != value.index) {
buffer.set(value.index, value);
} else {
subscriber.next(value);
while (buffer.has(++current)) {
subscriber.next(buffer.get(current));
buffer.delete(current);
}
}
},
complete: value => subscriber.complete(),
});
});
};
}
With the sortByIndex
operator in place, we can now complete our whole pipeline:
of(1, 2, 3, 4, 5, 6).pipe(
map((number, index) => ({ number, index })),
mergeMap(async ({ number, index }) => {
const doubled = await api.get('/double', { number });
return { index, number: doubled };
}, 3),
sortByIndex(),
map(({ number }) => number)
);
See it here in action: https://codepen.io/JosephSilber/pen/zYrwpNj?editors=1010
concurrentConcat
operatorIn fact, with this sortByIndex
operator in place, we can now create a general concurrentConcat
operator, which will do the transformations to and from the { index: number, value: T }
type internally:
function concurrentConcat(mapper, parallel) {
return observable => {
return observable.pipe(
mergeMap(
mapper,
(_, value, index) => ({ value, index }),
parallel
),
sortByIndex(),
map(({ value }) => value)
);
};
}
We can then use this concurrentConcat
operator instead of mergeMap
, and it will now emit the values in their original order:
of(1, 2, 3, 4, 5, 6).pipe(
concurrentConcat(number => api.get('/double', { number }), 3),
);
See it here in action: https://codepen.io/JosephSilber/pen/pogPpRP?editors=1010
So to solve my original problem with the order shipments:
from(orderNumbers)
.pipe(concurrentConcat(orderNumber => shipOrder(orderNumber), maxConcurrent))
.subscribe(orderNumber => printShippingLabel(orderNumber));
See it here in action: https://codepen.io/JosephSilber/pen/rNxmpWp?editors=1010
You can see that even though later orders might end up being shipped before earlier ones, the labels are always printed in their original order.
This solution is not even complete (since it doesn't handle inner observables that emit more than one value) yet it requires a bunch of custom code. This is such a common problem, that I feel there has to be an easier (built-in) way to solve this :|
Upvotes: 5