Floby
Floby

Reputation: 2336

ZeroMQ with node.js pipeline sink stops receiving messages after a while

I've been trying to set up a ventilator / worker / sink pattern in order to crawl pages, but I never got past the testing phase. The one particularity of my setup is that the sink lives in the same process as the ventilator. All nodes use ipc:// transport. For the moment only test messages are exchanged. the ventilator sends tasks, workers receive them and wait then send a confirmation to the sink.

Symptoms: After some time (generally less than 5 minutes) the sink stops receiving confirmation messages even though the ventilator keeps on sending tasks and workers keep on receiving them and sending confirmations messages.

I know that confirmations are sent because if I restart my sink, it gets all the missing messages on startup.

I thought ZeroMQ dealt with auto-reconnect.

ventilator/sink

var push = zmq.socket('push');
var sink = zmq.socket('pull');
var pi = 0;
setInterval(function() {
    push.send(['ping', pi++], zmq.ZMQ_SNDMORE);
    push.send('end');
}, 2000);
push.bind('ipc://crawl.ipc');
sink.bind('ipc://crawl-sink.ipc');
sink.on('message', function() {
    var args = [].slice.apply(arguments).map(function(e) {return e.toString()});
    console.log('got message', args.join(' '));
});

worker.js

var pull = zmq.socket('pull');
var sink = zmq.socket('push');
sink.connect(opt.sink);
pull.connect(opt.push);

pull.on('message', function() {
    var args = [].slice.apply(arguments).map(function(e) {return e.toString()});
    console.log('got job ', args.join(' '));
    setTimeout(function() {
        console.log('job done ', args.join(' '));
        sink.send(['job done', args.join(' ')]);
    }, Math.random() * 5 * 1000);
});

EDIT I tried moving the sink to another process and it seems to work. However I would really like it to live in the same process and I observed similar behaviour when dealing with more than one zmq socket per process, regardless of the pattern used

EDIT I'm using this module https://github.com/JustinTulloss/zeromq.node

Upvotes: 9

Views: 1756

Answers (2)

Tracker1
Tracker1

Reputation: 19344

Not sure if you are using the base AMQP client, or a package that uses it under the covers, I am having similar issues with RabbitMQ. The actual process is still running (setInterval works)

I am running my services/workers via cluster.fork from the main process... there are listeners in the main process that re-launch the workers/services upon exit... inside my worker I have a setInterval that runs every X seconds, if no work is done during that time, I have my worker process.exit (where the main process listener will launch a new fork). This works out as enough resiliancy for me. By having several workers running (listening for queue), work still gets done.

As another suggested, I've been considering a switch to Axon, as all my interfaces to the MQ are currently going through Node. My other systems are interfacing via a NodeJS driven API service. For that matter, it probably wouldn't be too hard to expose what you may need via an API service.

Upvotes: 2

Jacob Groundwater
Jacob Groundwater

Reputation: 6671

I don't necessarily expect this answer to be accepted, but I'm placing it here for reference. There is a very faithful node-only module called Axon which is inspired by ZeroMQ.

  • Axon has no compiled dependencies, and re-creates the same socket types as ZeroMQ.
  • Axon also builds upon the pub/sub socket type to create a network event-emitter.
  • Finally, ZMQs req/rep socket does not work with Node.js because ZMQ expects the reply to occur synchronously. Being native Node, the Axon library handles the req/rep pattern properly.

Note: ZMQ and Axon are not interoperable.

Upvotes: 4

Related Questions