Reputation: 1592
I need to implement a Priority Queue with node.js
and SQS
. It's my first time with SQS
so I would like to hear your opinions.
I have 3 priorities for the jobs: p0, p1 and p2, p0 being the highest. p2 is the most frequent, after that p0 and last p1. In numbers I could roughly say it's:
p2 ~= 5p0
p0 >> p1
They are all independent jobs so order of performing them is not important.
I came up with 2 possible solutions (these are just sketches that aren't really running).
common to both
var _ = require('lodash'),
Promise = require('bluebird'),
sqs = require('some-sqs-module');
...
module.exports.getJob = function() {
return getJobByQueuePriority(0);
// or...
return getJobByJobPriority(0);
}
solution 1
Use 3 queues, one per priority. Sample 1 job from each queue by their priorities
var Queues = [p0url, p1url, p2url],
currentQueueIndex = 0;
function getJobByQueuePriority(priority) {
return new Promise(function(resolve, reject) {
var queueUrl = getNextQueue(priority);
if(!_.isEmpty(queueUrl)){
sqs.pullOne(queueUrl)
.then(function (job) {
// recursive promises???
return job ? resolve(job) : getJobByQueuePriority(priority + 1);
})
.catch(function (err) {
reject(err);
});
}
});
}
function getNextQueue(index) {
return index >= Queues.length ? '' : Queues[index];
}
solution 2
Use 1 queue and collect k jobs from it, then select the highest ranking job.
var QUEUE_URL = 'some/sqs/url',
JOBS_TO_PULL = 10;
function getJobByJobPriority (priority) {
return new Promise(function (resolve, reject) {
sqs.pullMultiple(QUEUE_URL, JOBS_TO_PULL)
.then(function (jobs) {
var job = getHighestPriorityJob(jobs);
resolve(job);
})
.catch(function (err) {
reject(err);
});
});
}
function getHighestPriorityJob(jobs) {
var highest = jobs[0];
_.each(jobs, function (job) {
if(job.priority < highest.priority){
highest = job;
}
if(highest.priority == HIGHEST_PRIORITY) break;
});
return highest;
}
and here's my wanted consumer
var pq = require('my-priority-queue');
function lookForWork() {
pq.getJob()
.then(function (job) {
job ? work(job) : rest();
});
}
function work(job) {
// do your work...
// search for more work
lookForWork();
}
function rest() {
(setTimeout(function () {
lookForWork();
}, TIME_TO_REST))();
}
it's all sketches so never mind small glitches if you see them.
Upvotes: 3
Views: 768
Reputation: 76
I have implemented a simple priority queue using the sqs-consumer package as follows;
const Consumer = require('sqs-consumer');
const queue = Consumer.create({
...
});
const priorityQueue = Consumer.create({
...
});
priorityQueue.on('message_received', () => {
queue.stop();
});
priorityQueue.on('empty', () => {
queue.start();
});
queue.start();
priorityQueue.start();
Basically, stop the regular queue when the priority receives a message, and start it again when the priority queue is empty.
I've only just implemented it, but here's the effect it had on my two queues (no. messages deleted, blue is the priority queue);
Upvotes: 1
Reputation: 46841
A different queue for each priority is the better way to go imo. Its more scalable (for example you could add more consumers that only process the P2 queue for example if you ever needed to).
With a single queue, every time you pull messages from the queue and don't use them, you are preventing another consumer from seeing those at all until they are returned.
Not sure what your volumes are, and even though SQS is cheap, reading lots of messages and not using them still incurs a charge.
Upvotes: 2