jcollum
jcollum

Reputation: 46613

Is there an equivalent of "ping" for RabbitMQ? How can I diagnose whether an exchange or queue is broadcasting?

I'm using postwait/node-amqp (link) to connect to a variety of RabbitMQ exchanges and queues in our organization.

As my project has moved from dev to production I've encountered several issues with queues not being setup correctly or passwords being incorrect etc. In the latter case, it's obvious, I'll get a ECONNREFUSED error. In the first case though, I don't get any errors, just a timeout on the connect.

Given a URI like amqp://USER:[email protected] how can I determine if a queue called "FooWorkItems.Work' is accepting connections for listening? What's the bare minimum code for this, the equivalent of checking if an API is listening or a server is up and listening on the ping port?

Code:

if (this.amqpLib == null) {
    this.amqpLib = require('amqp');
  }
this.connection = this.amqpLib.createConnection({
    url: this.endpoint
  });

  this.connection.on('ready', (function(_this) {
    return function() {
      var evt, _fn, _fn1, _i, _j, _len, _len1, _ref, _ref1;
      _this.logger.info("" + _this.stepInfo + " connected to " + _this.endpoint + "; connecting to " + queueName + "  now.");
      if (_this.fullLogging) {
        _ref = ['connect', 'heartbeat', 'data'];
        _fn = function(evt) {
          return _this.connection.on(evt, function() {
            _this.logger.trace("" + _this.stepInfo + " AMQP event: " + evt);
            if (arguments != null) {
              return _this.logger.trace({
                args: arguments
              });
            }
          });
        };
        for (_i = 0, _len = _ref.length; _i < _len; _i++) {
          evt = _ref[_i];
          _fn(evt);
        }
        _ref1 = ['error', 'close', 'blocked', 'unblocked'];
        _fn1 = function(evt) {
          return _this.connection.on(evt, function() {
            if (evt !== 'close') {
              return _this.logger.error("" + _this.stepInfo + " AMQP event: " + evt);
            } else {
              return _this.logger.warn("" + _this.stepInfo + " AMQP event: " + evt);
            }
          });
        };
        for (_j = 0, _len1 = _ref1.length; _j < _len1; _j++) {
          evt = _ref1[_j];
          _fn1(evt);
        }
      }
      return _this.connection.queue(_this.queueName, {
        passive: true
      }, function(q) {
        logger.debug("" + stepInfo + " connected to queue " + queueName + ". Init complete.");
        return q.subscribe(function(message, headers, deliveryInfo, messageObject) {
          logger.trace("" + stepInfo + " recvd message");
          return logger.trace({
            headers: headers
          });
        });
      });
    };

Upvotes: 13

Views: 10314

Answers (1)

istepaniuk
istepaniuk

Reputation: 4267

In amqp, queues and exchanges are concepts unrelated to a connection, they don't listen or broadcast, and you can't connect to those, only to a broker.

The RabbitMQ server does of course accept network connections, and the protocol defines a logical Connection on top of the transport, this connection includes a heartbeat, configurable with the heartbeat option in this library.

Like you said, connection errors, including timeouts need to be taken care off at startup, for the rest you can rely on the heartbeat, analogous to a "ping" mechanism. If the connection is interrupted and your heartbeat parameter is set, your library will simply throw an error, so there is no need for you to re-implement this.

You should also take a look a the reconnect setting in postwait/node-ampq, as it might automatically deal with some of the network failure modes.

Upvotes: 4

Related Questions