David Armendariz
David Armendariz

Reputation: 1759

How to do a simple unit testing with RabbitMQ in python?

In my unit test, I want to simply start consuming, publish a message, and receive a response back and assert if the response is what I expect it to be. However, I have been trying to do this for hours and don't find a solution.

The problem is that I cannot define a method in a class that stops the consuming. I have tried defining a method like this:

def stop(self):
    self.channel.basic_cancel()
def stop(self):
    self.channel.stop_consuming()
def stop(self):
    self.connection.close()

But nothing seems to work. I have read that this is because once you execute start_consuming(), the only way to stop the consuming is to cancel it after a message is sent. But if I do this, then I would be modifying the original on_request and that wouldn't be useful for my application, because the connection will close after the first message. I have found pytest-rabbitmq but the documentation isn't clear for me and thus don't know if I can use this plugin to achieve what I want.

By the way, what is the difference between basic_cancel, stop_consuming and close?

Upvotes: 8

Views: 9706

Answers (3)

lamoboos223
lamoboos223

Reputation: 41

from unittest.mock import patch
import pika

def callback_func(channel, method, properties, body):
    print("Message consumed:", body)

@patch("pika.BlockingConnection", spec=pika.BlockingConnection)
def mock_publish(mock_conn):
    def side_effect_publish(exchange, routing_key, body):
        print(f"Message published to {routing_key}:", body)

    # reroute basic_publish call to side_effect_publish 
    mock_conn.return_value.channel.return_value.basic_publish.side_effect = side_effect_publish

    # execute publishing code
    connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel = connection.channel()
    channel.basic_publish(exchange="",
                    routing_key="QUEUE",
                    body="{ Hello World! }")
    connection.close()

@patch("pika.BlockingConnection", spec=pika.BlockingConnection)
def mock_consume(mock_conn):
    def side_effect_consume():
        callback_func(mock_conn.return_value.channel, None, None, "{ Hello World! }")

    # reroute start_consuming to side_effect_consume
    mock_conn.return_value.channel.return_value.start_consuming.side_effect = side_effect_consume

    # execute consuming code
    connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel = connection.channel()
    channel.basic_consume("QUEUE", on_message_callback=callback_func) # this on_message_callback does nothing
    channel.start_consuming()
    connection.close()

mock_publish()
mock_consume()

Upvotes: 0

PeterKogan
PeterKogan

Reputation: 321

Here is an example of the pytest-rabbitmq plugin:

import asyncio
import rabbitpy


async def test_queue(fastapi_client, mocker, rabbitmq):
    mock_consumer_method = mocker.MagicMock(name="consumer_method")
    mocker.patch(
        "my.project.consumer_method",
        new=mock_consumer_method,
    )

    connection_details = rabbitmq.args
    os.environ[
        "RABBITMQ_CONNECTION_STRING"
    ] = f"amqp://{connection_details['username']}:{connection_details['password']}@{connection_details['host']}:{connection_details['port']}/"  # this is used by my consumer

    channel = rabbitmq.channel()
    queue = rabbitpy.Queue(
        channel, name="QUEUE_NAME", durable=False, message_ttl=5 * 60 * 1000
    )
    queue.declare()
    message = rabbitpy.Message(channel, "some message")
    message.publish(exchange="", routing_key=PRODUCE_QUEUE_NAME)

    with fastapi_client:  # triggers the consumer startup event
        await asyncio.sleep(20)

    mock_consumer_method.assert_called_once_with("some message")

I already have a consumer as part of my FastAPI server, your use case might be different.

Upvotes: 4

bumblebee
bumblebee

Reputation: 1841

I am not getting a clear idea of your scenario!. With my understanding, you can create the connection and channel in the same method so that you can publish, consumer, assert and stop the consume when needed

Hope this helps!

def test_rabbitmq():
    from pika import BlockingConnection, ConnectionParameters, PlainCredentials

    conn = BlockingConnection(ConnectionParameters(host='host', virtual_host='vhost', credentials=PlainCredentials('username', 'password')))
    channel = conn.channel()

    # define your consumer
    def on_message(channel, method_frame, header_frame, body):
        message = body.decode()
        # assert your message here
        # asset message == 'value'
        channel.basic_cancel('test-consumer')  # stops the consumer

    # define your publisher
    def publish_message(message):
        channel.basic_publish(exchange='', routing_key='', body=message')

    publish('your message')
    tag = channel.basic_consume(queue='queue', on_message_callback=on_message, consumer_tag='test-consumer')

stop_consuming - Cancels all consumers, signalling the start_consuming loop to exit.

basic_cancel - This method cancels a consumer. A consumer tag will be taken as input.

close - Closes the connection/channel

Reference

Upvotes: 2

Related Questions