Reputation: 18919
What is the best way to continuously process items added to the queue? I see the following method
queue.process
https://github.com/Automattic/kue#processing-jobs
but this will process and return. Items added after this call are obviously not processed.
I thought to do:
queue.on('job enqueue', function(id, type){
queue.process('email', function (job, done) {
console.log('processing one: ' + job.id);
done(null);
});
});
But not sure if this fires multiple process methods ignoring the current queue status?
EDIT:
I have created a handler to listen to the 'email' type, yet it is only called once given the scenario below. Unless I am missing something, I would expect process to be run exactly 10 times here?
const queue = kue.createQueue();
queue.process('email', function (job, done) {
email(job.id, job.data, done);
});
var email = function(id, email, done) {
console.log('job: %s, sent to: %s number: %s', id, email.to, email.number);
done(null, {result: 'OK'});
};
queue
.on('job enqueue', function (id, type) {
console.log('job %s got queued of type %s with id %s', id, type);
})
.on('job complete', function (id, result) {
console.log('job complete: ' + id);
});
for (var i = 0; i < 10; i++) {
queue
.create('email', {
title: 'welcome email for tj',
number: i,
to: '[email protected]',
template: 'welcome-email'
})
.removeOnComplete(true)
.save();
}
Upvotes: 3
Views: 2692
Reputation: 203359
The function passed to queue.process
will get called for each enqueued job.
Think of it as an event handler: it "listens" for (in this case) "email" events/jobs, and for each one the handler function will be called. So it's incorrect that "items added after this call are obviously not processed".
As per the fine manual, "by default a call to queue.process()
will only accept one job at a time for processing", but you can increase the concurrency:
queue.process('email', 20, function(job, done){
// ...
});
Some example code that shows the handler gets called for each new job:
const kue = require('kue');
const queue = kue.createQueue();
queue.process('email', function(job, done){
console.log('got job', job.data);
done();
});
setInterval(() => {
queue.create('email', {
timestamp : new Date()
}).save();
}, 1000);
Upvotes: 4