fafl
fafl

Reputation: 7385

Robust aio-pika connection to multiple RabbitMQ hosts

In our setup we have a central RabbitMQ instance running on three hosts, each with its own URL.

For maintenance, any of these hosts may go down at any time for several hours. When this happens, we would like to connect to one of the other hosts.

We have been using aio_pika.connect_robust to connect, but it only accepts a single host as parameter.

It would be perfect if the reconnect could happen seamlessly in the background. A worker could get a message from the connection to one host, work on it and then acknowledge it over a different connection.

What would be the best way to solve this?

Upvotes: 3

Views: 3315

Answers (2)

fafl
fafl

Reputation: 7385

So in the end I figured out a way to do it. As I described in the comment to Luke's answer, I was not able to fix a broken channel. So I chose to save the output of a worker before trying to acknowledge the message. This fails when the connection is broken, so the message is never acknowledged and will be sent to the worker again, starting a new coroutine, which can then reuse the previous result.

I hope this code makes it clear:

def on_foo_message(connection_loss_event):
    async def context_aware_on_message(message: aio_pika.IncomingMessage):

        try:
            # Obtain a message-specific lock, so only one
            # coroutine can work on a message

            # Check if the result was already calculated
            # If yes, just ack here and return

            # Work on the message

            # Save the result

            # Ack the message

        except pika.exceptions.ConnectionClosed:
            # This can happen while interacting with channel or message
            connection_loss_event.set()

    return context_aware_on_message


async def worker():
    rabbit_urls = get_rabbitmq_connection_uris()
    url_index = 0

    while True:

        try:
            # Connect to rabbit
            url_index = url_index % len(rabbit_urls)
            url = rabbit_urls[url_index]
            connection = await aio_pika.connect(url)
            channel = await connection.channel()
            logger.info(f'Connected to rabbit at {url}')

            # Configure queues
            await channel.set_qos(prefetch_count=MAX_MESSAGES_IN_PARALLEL)
            foo_queue = await channel.declare_queue('foo', durable=True)
            bar_queue = await channel.declare_queue('bar', durable=True)

            # Start listening to queues
            connection_loss_event = asyncio.Event()
            await foo_queue.consume(
                on_foo_message(connection_loss_event))
            logger.info(f'Now listening to queue "foo"')
            await bar_queue.consume(
                on_bar_message(connection_loss_event))
            logger.info(f'Now listening to queue "bar"')

            # Wait for connection loss
            await connection_loss_event.wait()
            raise ConnectionRefusedError()

        except ConnectionRefusedError:
            logger.info('No connection to rabbit, will try next url')
            url_index += 1
            await asyncio.sleep(2)


def main():
    loop = asyncio.get_event_loop()
    loop.create_task(worker())
    loop.run_forever()    

if __name__ == '__main__':
    main()

Upvotes: 1

Luke Bakken
Luke Bakken

Reputation: 9647

A worker could get a message from the connection to one host, work on it and then acknowledge it over a different connection

That's not possible, since acks are tied to channels. When the first channel closes, RabbitMQ will re-enqueue the message and will re-deliver it to another consumer.

It looks as though aio-pika does not support multiple hosts from which to choose to connect. I recommend either trapping connection-related exceptions yourself to choose another host, or put haproxy between your application and RabbitMQ.

Upvotes: 2

Related Questions