Reputation: 1022
I'm running across a bug with a custom asynchronous queue that calls 10 async functions at a time.
I'm initiating the queue with 50 jobs, once first 10 jobs are finished the queue moves to the subsequent 10 until it finishes all.
The bug I'm coming across is that once it finishes 50, it restarts with first 5 jobs with 2 or 3 or 1 job at a time. It also takes fewer than 10 jobs towards the end of the queue.
Please create these two files and test with mocha and see the output yourself.
Note: Set the timeout in mocha to 0 to keep the test running for prolonged period of time.
Queue.js
function Queue(func, max) {
this.jobs = [];
this.func = func;
this.max = max ? max : 10;
}
Queue.prototype.push = function(data) {
var self = this;
return new Promise(function(resolve, reject){
self.jobs.push({data: data, resolve: resolve, reject: reject});
if(!self.progress) {
self.progress = true;
self.run();
}
});
};
Queue.prototype.run = function() {
var self = this;
var tasks = [];
console.log("--------------------");
for(var i=0; i<this.jobs.length && i < this.max; i++) {
tasks.push(this.jobs.shift());
console.log("queuing", tasks[tasks.length-1].data);
}
console.log("Total jobs queued", tasks.length);
Promise.all(
tasks.map(function(task){
return self.func(task.data)
.then(task.resolve, task.reject);
}
)).then(this.next.bind(this));
};
Queue.prototype.next = function(){
if(this.jobs.length) {
this.run();
} else {
this.progress = false;
}
};
module.exports = Queue;
QueueTest.js
function async(data) {
return new Promise(function(resolve, reject){
setTimeout(function(){
console.log("resolving", data);
resolve(data);
}, Math.random() * 5000);
});
}
it("should test queue", function(done){
var queue = new Queue(async);
Promise.all(
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50].map(queue.push.bind(queue))
).then(function(){
done();
});
});
Upvotes: 3
Views: 5173
Reputation: 5738
const fastQueue = async <T, Q>(
x: T[],
threads: number,
fn: (v: T, i: number, a: T[]) => Promise<Q>
) => {
let k = 0;
const result = Array(x.length) as Q[];
await Promise.all(
[...Array(threads)].map(async () => {
while (k < x.length) result[k] = await fn(x[k], k++, x);
})
);
return result;
};
const demo = async () => {
const wait = (x: number) => new Promise(r => setTimeout(r, x, x))
console.time('a')
console.log(await fastQueue([1000, 2000, 3000, 2000, 2000], 4, (v) => wait(v)))
console.timeEnd('a')
}
demo();
Upvotes: 0
Reputation: 514
The issue is this condition in the for
loop: i<this.jobs.length
i
is counting the number of jobs scheduled in the batch. This condition is correct when i
is an index in the jobs array. In this case, we just want to confirm there are still jobs left to process, so we can simply use: this.jobs.length>0
The odd behavior towards the end of the queue is because the length is falling as elements are shifted off the jobs array, but the number of jobs scheduled in that batch (i
) is increasing. Let's take an example where this.jobs.length
is 4 when entering the for loop:
i | this.jobs.length | i < this.jobs.length |
---|---|---|
0 | 4 | true |
1 | 3 | true |
2 | 2 | false |
In this case, the loop is exited after only scheduling 2 of the four jobs in the queue. Checking if there are remaining tasks instead fixes the issue:
for(var i=0; this.jobs.length > 0 && i < this.max; i++) {
Upvotes: 0
Reputation: 19288
The problem lies in the for
loop in Queue.prototype.run
.
I can't immediately see why it should misbehave the way it does but a fix is to replace the for
loop with self.jobs.splice()
to create the tasks
array.
Queue.prototype.run = function() {
console.log("--------------------");
var self = this;
var tasks = self.jobs.splice(0, self.max); // <<<<<<<< this is the fix
console.log("tasks:", tasks.map(obj => obj.data));
Promise.all(
tasks.map(function(task){
return self.func(task.data)
.then(task.resolve, task.reject);
}
)).then(this.next.bind(this));
};
Nothing else needs to change.
Upvotes: 2