Reputation: 2336
I've been trying to set up a ventilator / worker / sink pattern in order to crawl pages, but I never got past the testing phase. The one particularity of my setup is that the sink lives in the same process as the ventilator. All nodes use ipc:// transport. For the moment only test messages are exchanged. the ventilator sends tasks, workers receive them and wait then send a confirmation to the sink.
Symptoms: After some time (generally less than 5 minutes) the sink stops receiving confirmation messages even though the ventilator keeps on sending tasks and workers keep on receiving them and sending confirmations messages.
I know that confirmations are sent because if I restart my sink, it gets all the missing messages on startup.
I thought ZeroMQ dealt with auto-reconnect.
ventilator/sink
var push = zmq.socket('push');
var sink = zmq.socket('pull');
var pi = 0;
setInterval(function() {
push.send(['ping', pi++], zmq.ZMQ_SNDMORE);
push.send('end');
}, 2000);
push.bind('ipc://crawl.ipc');
sink.bind('ipc://crawl-sink.ipc');
sink.on('message', function() {
var args = [].slice.apply(arguments).map(function(e) {return e.toString()});
console.log('got message', args.join(' '));
});
worker.js
var pull = zmq.socket('pull');
var sink = zmq.socket('push');
sink.connect(opt.sink);
pull.connect(opt.push);
pull.on('message', function() {
var args = [].slice.apply(arguments).map(function(e) {return e.toString()});
console.log('got job ', args.join(' '));
setTimeout(function() {
console.log('job done ', args.join(' '));
sink.send(['job done', args.join(' ')]);
}, Math.random() * 5 * 1000);
});
EDIT I tried moving the sink to another process and it seems to work. However I would really like it to live in the same process and I observed similar behaviour when dealing with more than one zmq socket per process, regardless of the pattern used
EDIT I'm using this module https://github.com/JustinTulloss/zeromq.node
Upvotes: 9
Views: 1756
Reputation: 19344
Not sure if you are using the base AMQP client, or a package that uses it under the covers, I am having similar issues with RabbitMQ. The actual process is still running (setInterval works)
I am running my services/workers via cluster.fork from the main process... there are listeners in the main process that re-launch the workers/services upon exit... inside my worker I have a setInterval that runs every X seconds, if no work is done during that time, I have my worker process.exit (where the main process listener will launch a new fork). This works out as enough resiliancy for me. By having several workers running (listening for queue), work still gets done.
As another suggested, I've been considering a switch to Axon, as all my interfaces to the MQ are currently going through Node. My other systems are interfacing via a NodeJS driven API service. For that matter, it probably wouldn't be too hard to expose what you may need via an API service.
Upvotes: 2
Reputation: 6671
I don't necessarily expect this answer to be accepted, but I'm placing it here for reference. There is a very faithful node-only module called Axon which is inspired by ZeroMQ.
Note: ZMQ and Axon are not interoperable.
Upvotes: 4