Reputation: 46613
I'm using postwait/node-amqp
(link) to connect to a variety of RabbitMQ exchanges and queues in our organization.
As my project has moved from dev to production I've encountered several issues with queues not being setup correctly or passwords being incorrect etc. In the latter case, it's obvious, I'll get a ECONNREFUSED error. In the first case though, I don't get any errors, just a timeout on the connect.
Given a URI like amqp://USER:[email protected]
how can I determine if a queue called "FooWorkItems.Work' is accepting connections for listening? What's the bare minimum code for this, the equivalent of checking if an API is listening or a server is up and listening on the ping port?
Code:
if (this.amqpLib == null) {
this.amqpLib = require('amqp');
}
this.connection = this.amqpLib.createConnection({
url: this.endpoint
});
this.connection.on('ready', (function(_this) {
return function() {
var evt, _fn, _fn1, _i, _j, _len, _len1, _ref, _ref1;
_this.logger.info("" + _this.stepInfo + " connected to " + _this.endpoint + "; connecting to " + queueName + " now.");
if (_this.fullLogging) {
_ref = ['connect', 'heartbeat', 'data'];
_fn = function(evt) {
return _this.connection.on(evt, function() {
_this.logger.trace("" + _this.stepInfo + " AMQP event: " + evt);
if (arguments != null) {
return _this.logger.trace({
args: arguments
});
}
});
};
for (_i = 0, _len = _ref.length; _i < _len; _i++) {
evt = _ref[_i];
_fn(evt);
}
_ref1 = ['error', 'close', 'blocked', 'unblocked'];
_fn1 = function(evt) {
return _this.connection.on(evt, function() {
if (evt !== 'close') {
return _this.logger.error("" + _this.stepInfo + " AMQP event: " + evt);
} else {
return _this.logger.warn("" + _this.stepInfo + " AMQP event: " + evt);
}
});
};
for (_j = 0, _len1 = _ref1.length; _j < _len1; _j++) {
evt = _ref1[_j];
_fn1(evt);
}
}
return _this.connection.queue(_this.queueName, {
passive: true
}, function(q) {
logger.debug("" + stepInfo + " connected to queue " + queueName + ". Init complete.");
return q.subscribe(function(message, headers, deliveryInfo, messageObject) {
logger.trace("" + stepInfo + " recvd message");
return logger.trace({
headers: headers
});
});
});
};
Upvotes: 13
Views: 10314
Reputation: 4267
In amqp, queues and exchanges are concepts unrelated to a connection, they don't listen or broadcast, and you can't connect to those, only to a broker.
The RabbitMQ server does of course accept network connections, and the protocol defines a logical Connection on top of the transport, this connection includes a heartbeat, configurable with the heartbeat
option in this library.
Like you said, connection errors, including timeouts need to be taken care off at startup, for the rest you can rely on the heartbeat, analogous to a "ping" mechanism. If the connection is interrupted and your heartbeat parameter is set, your library will simply throw an error, so there is no need for you to re-implement this.
You should also take a look a the reconnect
setting in postwait/node-ampq, as it might automatically deal with some of the network failure modes.
Upvotes: 4