Reputation: 1759
In my unit test, I want to simply start consuming, publish a message, and receive a response back and assert if the response is what I expect it to be. However, I have been trying to do this for hours and don't find a solution.
The problem is that I cannot define a method in a class that stops the consuming. I have tried defining a method like this:
def stop(self):
self.channel.basic_cancel()
def stop(self):
self.channel.stop_consuming()
def stop(self):
self.connection.close()
But nothing seems to work. I have read that this is because once you execute start_consuming()
, the only way to stop the consuming is to cancel it after a message is sent. But if I do this, then I would be modifying the original on_request
and that wouldn't be useful for my application, because the connection will close after the first message. I have found pytest-rabbitmq but the documentation isn't clear for me and thus don't know if I can use this plugin to achieve what I want.
By the way, what is the difference between basic_cancel
, stop_consuming
and close
?
Upvotes: 8
Views: 9706
Reputation: 41
from unittest.mock import patch
import pika
def callback_func(channel, method, properties, body):
print("Message consumed:", body)
@patch("pika.BlockingConnection", spec=pika.BlockingConnection)
def mock_publish(mock_conn):
def side_effect_publish(exchange, routing_key, body):
print(f"Message published to {routing_key}:", body)
# reroute basic_publish call to side_effect_publish
mock_conn.return_value.channel.return_value.basic_publish.side_effect = side_effect_publish
# execute publishing code
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.basic_publish(exchange="",
routing_key="QUEUE",
body="{ Hello World! }")
connection.close()
@patch("pika.BlockingConnection", spec=pika.BlockingConnection)
def mock_consume(mock_conn):
def side_effect_consume():
callback_func(mock_conn.return_value.channel, None, None, "{ Hello World! }")
# reroute start_consuming to side_effect_consume
mock_conn.return_value.channel.return_value.start_consuming.side_effect = side_effect_consume
# execute consuming code
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.basic_consume("QUEUE", on_message_callback=callback_func) # this on_message_callback does nothing
channel.start_consuming()
connection.close()
mock_publish()
mock_consume()
Upvotes: 0
Reputation: 321
Here is an example of the pytest-rabbitmq plugin:
import asyncio
import rabbitpy
async def test_queue(fastapi_client, mocker, rabbitmq):
mock_consumer_method = mocker.MagicMock(name="consumer_method")
mocker.patch(
"my.project.consumer_method",
new=mock_consumer_method,
)
connection_details = rabbitmq.args
os.environ[
"RABBITMQ_CONNECTION_STRING"
] = f"amqp://{connection_details['username']}:{connection_details['password']}@{connection_details['host']}:{connection_details['port']}/" # this is used by my consumer
channel = rabbitmq.channel()
queue = rabbitpy.Queue(
channel, name="QUEUE_NAME", durable=False, message_ttl=5 * 60 * 1000
)
queue.declare()
message = rabbitpy.Message(channel, "some message")
message.publish(exchange="", routing_key=PRODUCE_QUEUE_NAME)
with fastapi_client: # triggers the consumer startup event
await asyncio.sleep(20)
mock_consumer_method.assert_called_once_with("some message")
I already have a consumer as part of my FastAPI server, your use case might be different.
Upvotes: 4
Reputation: 1841
I am not getting a clear idea of your scenario!. With my understanding, you can create the connection and channel in the same method so that you can publish, consumer, assert and stop the consume when needed
Hope this helps!
def test_rabbitmq():
from pika import BlockingConnection, ConnectionParameters, PlainCredentials
conn = BlockingConnection(ConnectionParameters(host='host', virtual_host='vhost', credentials=PlainCredentials('username', 'password')))
channel = conn.channel()
# define your consumer
def on_message(channel, method_frame, header_frame, body):
message = body.decode()
# assert your message here
# asset message == 'value'
channel.basic_cancel('test-consumer') # stops the consumer
# define your publisher
def publish_message(message):
channel.basic_publish(exchange='', routing_key='', body=message')
publish('your message')
tag = channel.basic_consume(queue='queue', on_message_callback=on_message, consumer_tag='test-consumer')
stop_consuming
- Cancels all consumers, signalling the start_consuming loop to exit.
basic_cancel
- This method cancels a consumer. A consumer tag will be taken as input.
close
- Closes the connection/channel
Upvotes: 2