Yoni Levy
Yoni Levy

Reputation: 1592

Node SQS Priority Queue

I need to implement a Priority Queue with node.js and SQS. It's my first time with SQS so I would like to hear your opinions.

I have 3 priorities for the jobs: p0, p1 and p2, p0 being the highest. p2 is the most frequent, after that p0 and last p1. In numbers I could roughly say it's:

p2 ~= 5p0
p0 >> p1

They are all independent jobs so order of performing them is not important.

I came up with 2 possible solutions (these are just sketches that aren't really running).

common to both

var _ = require('lodash'),
    Promise = require('bluebird'),
    sqs = require('some-sqs-module');

...

module.exports.getJob = function() {
    return getJobByQueuePriority(0);
    // or...
    return getJobByJobPriority(0);
}

solution 1

Use 3 queues, one per priority. Sample 1 job from each queue by their priorities

var Queues = [p0url, p1url, p2url],
    currentQueueIndex = 0;

function getJobByQueuePriority(priority) {
    return new Promise(function(resolve, reject) {

        var queueUrl = getNextQueue(priority);

        if(!_.isEmpty(queueUrl)){
            sqs.pullOne(queueUrl)
                .then(function (job) {
                    // recursive promises???
                    return job ? resolve(job) : getJobByQueuePriority(priority + 1);
                })
                .catch(function (err) {
                    reject(err);
                });
        }
    });
}

function getNextQueue(index) {
    return index >= Queues.length ? '' : Queues[index];
}

solution 2

Use 1 queue and collect k jobs from it, then select the highest ranking job.

var QUEUE_URL = 'some/sqs/url', 
    JOBS_TO_PULL = 10;

function getJobByJobPriority (priority) {
    return new Promise(function (resolve, reject) {
        sqs.pullMultiple(QUEUE_URL, JOBS_TO_PULL)
            .then(function (jobs) {
                var job = getHighestPriorityJob(jobs);
                resolve(job);
            })
            .catch(function (err) {
                reject(err);
            });
    });
}

function getHighestPriorityJob(jobs) {
    var highest = jobs[0];

    _.each(jobs, function (job) {
        if(job.priority < highest.priority){
            highest = job;
        }

        if(highest.priority == HIGHEST_PRIORITY) break;
    });

    return highest;
}

and here's my wanted consumer

var pq = require('my-priority-queue');

function lookForWork() {
    pq.getJob()
        .then(function (job) {
            job ? work(job) : rest();
        }); 
}

function work(job) {
    // do your work...

    // search for more work
    lookForWork();
}

function rest() {
    (setTimeout(function () {
        lookForWork();
    }, TIME_TO_REST))();
}

it's all sketches so never mind small glitches if you see them.

Upvotes: 3

Views: 768

Answers (2)

Alexander
Alexander

Reputation: 76

I have implemented a simple priority queue using the sqs-consumer package as follows;

const Consumer = require('sqs-consumer');

const queue = Consumer.create({
  ...
});

const priorityQueue = Consumer.create({
  ...
});

priorityQueue.on('message_received', () => {
  queue.stop();
});

priorityQueue.on('empty', () => {
  queue.start();
});

queue.start();
priorityQueue.start();

Basically, stop the regular queue when the priority receives a message, and start it again when the priority queue is empty.

I've only just implemented it, but here's the effect it had on my two queues (no. messages deleted, blue is the priority queue);

Effect it had on my two queues

Upvotes: 1

E.J. Brennan
E.J. Brennan

Reputation: 46841

A different queue for each priority is the better way to go imo. Its more scalable (for example you could add more consumers that only process the P2 queue for example if you ever needed to).

With a single queue, every time you pull messages from the queue and don't use them, you are preventing another consumer from seeing those at all until they are returned.

Not sure what your volumes are, and even though SQS is cheap, reading lots of messages and not using them still incurs a charge.

Upvotes: 2

Related Questions