Dan H
Dan H

Reputation: 14560

connection to two RabbitMQ servers

I'm using python with pika, and have the following two similar use cases:

  1. Connect to RabbitMQ server A and server B (at different IP addrs with different credentials), listen on exchange A1 on server A; when a message arrives, process it and send to an exchange on server B
  2. Open an HTTP listener and connect to RabbitMQ server B; when a specific HTTP request arrives, process it and send to an exchange on server B

Alas, in both these cases using my usual techniques, by the time I get to sending to server B the connection throws ConnectionClosed or ChannelClosed.

I assume this is the cause: while waiting on the incoming messages, the connection to server B (its "driver") is starved of CPU cycles, and it never gets a chance to service is connection socket, thus it can't respond to heartbeats from server B, thus the servers shuts down the connection.

But I can't noodle out the fix. My current work around is lame: I catch the ConnectionClosed, reopen a connection to server B, and retry sending my message.

But what is the "right" way to do this? I've considered these, but don't really feel I have all the parts to solve this:

I've checked StackOverflow and Googled this for an hour last night: I can't for the life of me find a blog post or sample code for this.

Any input? Thanks a million!

Upvotes: 2

Views: 1479

Answers (1)

Karl Sutt
Karl Sutt

Reputation: 575

I managed to work it out, basing my solution on the documentation and an answer in the pika-python Google group.

First of all, your assumption is correct — the client process that's connected to server B, responsible for publishing, cannot reply to heartbeats if it's already blocking on something else, like waiting a message from server A or blocking on an internal communication queue.

The crux of the solution is that the publisher should run as a separate thread and use BlockingConnection.process_data_events to service heartbeats and such. It looks like that method is supposed to be called in a loop that checks if the publisher still needs to run:

def run(self):
    while self.is_running:
        # Block at most 1 second before returning and re-checking
        self.connection.process_data_events(time_limit=1)

Proof of concept

Since proving the full solution requires having two separate RabbitMQ instances running, I have put together a Git repo with an appropriate docker-compose.yml, the application code and comments to test this solution.

https://github.com/karls/rabbitmq-two-connections

Solution outline

Below is a sketch of the solution, minus imports and such. Some notable things:

  1. Publisher runs as a separate thread
  2. The only "work" that the publisher does is servicing heartbeats and such, via Connection.process_data_events
  3. The publisher registers a callback whenever the consumer wants to publish a message, using Connection.add_callback_threadsafe
  4. The consumer takes the publisher as a constructor argument so it can publish the messages it receives, but it can work via any other mechanism as long as you have a reference to an instance of Publisher
  5. The code is taken from the linked Git repo, which is why certain details are hardcoded, e.g the queue name etc. It will work with any RabbitMQ setup needed (direct-to-queue, topic exchange, fanout, etc).
class Publisher(threading.Thread):
    def __init__(
        self,
        connection_params: ConnectionParameters,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)
        self.daemon = True
        self.is_running = True
        self.name = "Publisher"
        self.queue = "downstream_queue"
        self.connection = BlockingConnection(connection_params)
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=self.queue, auto_delete=True)
        self.channel.confirm_delivery()

    def run(self):
        while self.is_running:
            self.connection.process_data_events(time_limit=1)

    def _publish(self, message):
        logger.info("Calling '_publish'")
        self.channel.basic_publish("", self.queue, body=message.encode())

    def publish(self, message):
        logger.info("Calling 'publish'")
        self.connection.add_callback_threadsafe(lambda: self._publish(message))

    def stop(self):
        logger.info("Stopping...")
        self.is_running = False

        # Call .process_data_events one more time to block
        # and allow the while-loop in .run() to break.
        # Otherwise the connection might be closed too early.
        #
        self.connection.process_data_events(time_limit=1)

        if self.connection.is_open:
            self.connection.close()
            logger.info("Connection closed")
        logger.info("Stopped")


class Consumer:
    def __init__(
        self,
        connection_params: ConnectionParameters,
        publisher: Optional["Publisher"] = None,
    ):
        self.publisher = publisher
        self.queue = "upstream_queue"
        self.connection = BlockingConnection(connection_params)
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=self.queue, auto_delete=True)
        self.channel.basic_qos(prefetch_count=1)

    def start(self):
        self.channel.basic_consume(
            queue=self.queue, on_message_callback=self.on_message
        )
        try:
            self.channel.start_consuming()
        except KeyboardInterrupt:
            logger.info("Warm shutdown requested...")
        except Exception:
            traceback.print_exception(*sys.exc_info())
        finally:
            self.stop()

    def on_message(self, _channel: Channel, m, _properties, body):
        try:
            message = body.decode()
            logger.info(f"Got: {message!r}")
            if self.publisher:
                self.publisher.publish(message)
            else:
                logger.info(f"No publisher provided, printing message: {message!r}")
            self.channel.basic_ack(delivery_tag=m.delivery_tag)
        except Exception:
            traceback.print_exception(*sys.exc_info())
            self.channel.basic_nack(delivery_tag=m.delivery_tag, requeue=False)

    def stop(self):
        logger.info("Stopping consuming...")
        if self.connection.is_open:
            logger.info("Closing connection...")
            self.connection.close()

        if self.publisher:
            self.publisher.stop()

        logger.info("Stopped")

Upvotes: 3

Related Questions