Reputation: 1429
In my program source code I have the following function (Promise concurrency limitation function, similar to pLimit):
async function promiseMapLimit(
array,
poolLimit,
iteratorFn,
) {
const ret = [];
const executing = [];
for (const item of array) {
const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p);
if (poolLimit <= array.length) {
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
executing.push(e);
if (executing.length >= poolLimit) {
await Promise.race(executing);
}
}
}
return Promise.all(ret);
}
It works properly, so if I passed it an array of numbers [1..99] and try to multiply it by 2 it will give the correct output [0..198].
const testArray = Array.from(Array(100).keys());
promiseMapLimit(testArray, 20, async (value) => value * 2).then((result) =>
console.log(result)
);
Code sample - js playground.
But I can't understand its logic, during the debugging I noticed, that it adds promises in chunks of 20 and only after that goes further:
For example, this block of code:
for (const item of array) {
const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p);
will iterate over 20 items of an array (why not all 100???)
same here:
if (poolLimit <= array.length) {
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
executing.push(e);
it will add only 20 items to the executing
array and only after that step inside if (executing.length >= poolLimit)
code block.
I'd be very grateful for the explanation of how this function works.
Upvotes: 6
Views: 1465
Reputation: 813
This answer will be fun to write.
async function promiseMapLimit(
array,
poolLimit,
iteratorFn,
) {
const ret = [];
const executing = [];
for (const item of array) {
const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p);
if (poolLimit <= array.length) {
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
executing.push(e);
if (executing.length >= poolLimit) {
await Promise.race(executing);
}
}
}
return Promise.all(ret);
}
So there are three things happening in this piece of code.
Promise.race
Ignore 2 for sometime and lets breakdown 1 and 3.
Lets run through a simple scenario.
promiseMapLimit([1, 2, 3], 1, async function (i) => { return 2 * i });
This line calls the above function.
now because of this like const p = Promise.resolve().then(() => iteratorFn(item, array));
ret
now has [Promise(2), Promise(4), Promise(6)]
Now because poolLimit is less than array.length we go into the if block.
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
this line adds a then
block which will get executed after the promise elements from the ret array resolve. In this then block each promise iterates through the executing array finds itself and remove itself.
It is equivalent to:
const e = p.then(() => {
1. Keep e as closure variable for later, for when the promise does get resolved in Promise.race line
2. find that e in executing array
3. Remove it
});
Now that you understand the 1 and 3. Lets go to 2. Loop runs for the array.length and when executing.length >= poolLimit it waits for execution of promises in the executing queue due to this line await Promise.race(executing);
Let me rewrite the function so its easier to understand:
async function promiseMapLimit(
array,
poolLimit,
iteratorFn,
) {
const ret = [];
const executing = [];
for (const item of array) {
const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p);
}
if (poolLimit > array.length) {
// if poolLimit and greater that array just resolve all the promises and no need to worry of concurrency
return Promise.all(ret);
}
for (const p of ret) {
// Add self remove logic to each promise element
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
// Put the promise element in execution queue
executing.push(e);
// Whenever the execution ques is full wait for execution of the promises
// Promise.race awaits for all the promises to finish execution
// And because of the self removal logic execution queue also becomes empty
if (executing.length >= poolLimit) {
await Promise.race(executing);
}
}
// Promise.all will take care of all the promises remaining in the last batch for which await Promise.race(executing); was not executed.
return Promise.all(ret);
}
Upvotes: 0
Reputation: 113365
Very interesting question! I think the important part of the code here is Promise.race(...)
which resolves as soon one of the promises resolves.
I have added a sleep
function with a random factor (up to 6 seconds) to better visualize the way how this works.
The expected functionality is that: we always want 20 promises to be executed in paralel, and once one finishes, the next in the queue would execute.
In a visual way, that would look like this, for a limit of 3 and 10 promises – in the below example you can notice that in every moment in time there are 3 active promises (except when they end):
PromiseID | Start End |
0 [====]
1 [==]
2 [======]
3 [==========]
4 [====]
5 [================]
6 [==]
7 [====]
8 [======]
9 [========]
The code to create the random delay is below:
// Create the utility sleep function
const sleep = x => new Promise(res => setTimeout(res, x))
async function promiseMapLimit(array, poolLimit, iteratorFn) {
const ret = [];
const executing = [];
for (const item of array) {
const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p);
console.log(ret.length)
if (poolLimit <= array.length) {
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
executing.push(e);
if (executing.length >= poolLimit) {
console.log(`Running batch of ${executing.length} promises.`);
await Promise.race(executing);
// As ssoon one of the promise finishes, we continue the loop.
console.log("Resolved one promise.")
}
}
}
return Promise.all(ret);
}
const testArray = Array.from(Array(100).keys());
promiseMapLimit(testArray, 20, async (value) => {
// Log
console.log(`Computing iterator fn for ${value}`)
await sleep(3000 + Math.random() * 3000);
return value * 2
}).then((result) =>
console.log(result)
);
will iterate over 20 items of an array (why not all 100???)
At start, like in the graph, it will not iterate all 100 items, but the first 20 items and then the loop is paused by await Promise.race(...)
(because executing.length >= poolLimit
will be true after iterating 20 items).
Once a promise is fullfiled, it will be removed from the executing
array by executing.splice(executing.indexOf(e), 1)
.
I think things become more clear when having a delay (await sleep(...)
) so that we can simulate a real async operation (such as a database request etc).
Please let me know if there is anything else unclear.
Upvotes: 5
Reputation: 2734
Mr. Hedgehog's answer does explain the important parts. I tried to explain the function using inline comments, maybe this helps.
async function promiseMapLimit(
array,
poolLimit,
iteratorFn,
) {
// array for storing results / ResultPromises
const ret = [];
// array holding currently executed functions!.
const executing = [];
// iterate over array
for (const item of array) {
// Create a new Promise which is instantly resolved!.
// this is syntactic sougar and could also be written as:
// const p = new Promise((res, rej) => iteratorFn(item, array).then(res))
// but with this used version, it also would allow the iteratorFn to be NON ASYNC
const p = Promise.resolve().then(
// since it is resolved immediatly, this code block will be executed
// not immediatly but almost immediatly. (few mikroseconds delay..)
// Under the hoods, js always adds up all functions into a queue.
// So this function body will for example be executed after the `ret.push(p)` !!
() => iteratorFn(item, array)
);
// store the created promise in the results array anyways!!
ret.push(p);
// If the array holds less elements than poolLimit then nothing has to be done here and
// all elements can directly be executed.
if (poolLimit <= array.length) {
// if we get in here, it says that there are more elements in the array than allowed by
// poolLimit.
// This line adds a success handler to the promise. It basically
// removes itself from the executing array at the point it finishes!.
const e = p.then(() =>
// here you see, it searches for itself, and removes itself from the array using splice!.
executing.splice(executing.indexOf(e), 1)
);
// Add the promise to the currently executing ones.
// Note that this following line is executed before the "executing.splice(executing.indexOf(e), 1)"
executing.push(e);
// And the final, smart part!.
// If in the executing array are less elements than maximum allowed,
// do nothing!
if (executing.length >= poolLimit) {
// we got here, so the allowed size of executing queue is smaller than array.length .
// Promise.race() waits for the first promise to resolve in a given array
// So at the moment any promise is resolved, the execution will go on and not before!.
// Since the promises remove themselves from the executing array we know 1
// promise is now finished and the executing holds a space!.
await Promise.race(executing);
}
}
}
// since all promises were looped and added to results array it's now
// possible to await them all!.
return Promise.all(ret);
}
To understand this topic, you may wanna learn about eventloop and callstack of the v8 engine. => https://www.educative.io/edpresso/what-is-an-event-loop-in-javascript
Upvotes: 0
Reputation: 2885
You have await
inside async function. This works roughly as follows:
await
keywordawait
keywordIn your case, it iterates 20 times, then pauses everything once you hit a limit. Then, once at least one promise inside ret
resolved it proceeds.
Next thing that is happening is that once any of the promises resolved, removes itself from array. But since almost everything happens instantaneously, you see that it - resolves all 20 promises, fills with another 20. If you make your iteratorFn
slower with random delays, you'll see, that pool is constantly filling up to 20 and then almost immediately replaces freed space in pool with new promise, while there is at least some elements left.
Let's replace your iteratorFn
with this and call it:
let iter = async (value) => {
// randomly delay each calculation to 1, 2 or 3 seconds
return new Promise(resolve => setTimeout(resolve, [1000, 2000, 3000][Math.floor(Math.random() * 3)], value * 2))
}
promiseMapLimit(testArray, 20, iter).then((result) =>
console.log(result)
);
And let's log amount of elements inside executing
once there is a promise resolved:
if (poolLimit <= array.length) {
const e = p.then(() => {
executing.splice(executing.indexOf(e), 1);
// logging what is left
console.log({l: executing.length})
});
executing.push(e);
if (executing.length >= poolLimit) {
await Promise.race(executing);
}
}
This way in console you will see that logging starts with {l: 19}
, since pool is filled up and then one promise resolved. And it will continue, until the very end where log will go from 19 to 0.
Upvotes: 3