Reputation: 1680
I'm using the Python Azure ServiceBus package and have the following snippet of code:
# receiver.py
import logging
logger = logging.getLogger("test")
def receive_message(
connection_str,
queue_name
) -> None:
"""
Call Azure ServiceBus API to retrieve message from a queue
"""
with ServiceBusClient.from_connection_string(
connection_str, logging_enable=True
) as servicebus_client:
with servicebus_client.get_queue_receiver(
queue_name=queue_name,
max_wait_time=20,
receive_mode=ServiceBusReceiveMode.PEEK_LOCK,
) as receiver:
for message in receiver:
logger.debug(f"Received message {message}")
I'm attempting to write unit tests for this function, and want to be able to mock out the recevier
. Here is my attempt at writing the unit test, which fails because I can't figure out how to get the test to enter the for message in receiver
block.
# test_receiver.py
@patch("receiver.ServiceBusClient")
@patch("receiver.logger")
def test_receive_message(mock_logger, mock_svcbus_client):
# Figure out how to mock
mock_svcbus_client.from_connection_string.return_value.get_queue_receiver.return_value = iter(["message"])
receive_message("mock_connection_str", "mock_q_name")
# Assertion fails
mock_logger.return_value.debug.assert_called_once()
Upvotes: 5
Views: 1934
Reputation: 2009
It is possible to mock the receiver that is returned from the get_queue_receiver
method. I have an example for the async
version of the service bus client but it is easy to adapt it to a synchronized version if necessary. Therefore, install the following packages:
pip install azure-servicebus pytest pytest-asyncio
First, you need to inject the ServiceBusClient
instance to mock it in the test code. Create a class where the constructor takes the ServiceBusClient
instance:
# azure_servicebus.py
from azure.servicebus.aio import ServiceBusClient
class AzureServiceBus():
def __init__(self, client: ServiceBusClient) -> None:
self._client = client
async def receive_events_async(self, queue_name: str) -> None:
async with self._client.get_queue_receiver(queue_name) as receiver:
messages = await receiver.receive_messages()
for message in messages:
print(str(message))
await receiver.complete_message(message)
In the test file, the AsyncMock
class and the @pytest.mark.asyncio
decorator can be used to test the async/await code. Our focus is to patch the get_queue_receiver
statement:
# azure_servicebus_test.py
from unittest.mock import AsyncMock, patch
import pytest
from azure.servicebus.aio import ServiceBusClient
from module.path.for.azure_servicebus import AzureServiceBus
@pytest.mark.asyncio
async def test_receive_message():
mock_receiver = AsyncMock()
mock_receiver.__aenter__.return_value = mock_receiver
mock_receiver.__aexit__.return_value = False
mock_receiver.receive_messages.return_value= ["message 1", "message 2"]
mock_receiver.complete_message.return_value = None
with patch("azure.servicebus.aio.ServiceBusClient.get_queue_receiver", return_value=mock_receiver):
connstr = "Endpoint=sb://service-bus-hostname-test.servicebus.windows.net/;SharedAccessKeyName=shared-access-key-test;SharedAccessKey=sharedaccesskeytest=;EntityPath=topic-path-test"
async with ServiceBusClient.from_connection_string(connstr) as client:
service_bus = AzureServiceBus(client)
await service_bus.receive_events_async("my_queue_name")
The __aenter__
and the __aexit__
methods need to be set because we are using the async context manager. Specifically, the async with
will invoke these methods.
Another important part is the form of the connection string. If it is not well-formed, then the initialization will throw an exception. You don't need a valid connection string but a correctly formed one.
The output of the test should look like this:
$ pytest <path_to_test>/azure_servicebus_test.py -vs
=================================================================================================== test session starts ====================================================================================================
platform win32 -- Python 3.10.11, pytest-7.4.4, pluggy-1.5.0 -- C:\<path-to-virtual_env>\.venv\Scripts\python.exe
cachedir: .pytest_cache
rootdir: C:\<path_to_root_dir>
configfile: pyproject.toml
plugins: anyio-4.4.0, asyncio-0.23.7, azurepipelines-1.0.5, cov-4.1.0, mock-3.14.0, nunit-1.0.7
asyncio: mode=strict
collected 1 item
<path_to_test>/azure_servicebus_test.py::test_receive_message message 1
message 2
PASSED
The two messages are received from the mocked receiver instance.
Note: For the synchronized version, you do not need the pytest-asyncio
plugin, the decorator, and the async/await keywords. Moreover, use the MagicMock
instead of AsyncMock
class, and set the synchronized context manager (namely the __enter__
and the __exit__
methods).
Upvotes: 1
Reputation: 1864
You can try from mocks import MockReceivedMessage, MockReceiver
to mock the receiver
Example 1:
class MockReceivedMessage(ServiceBusReceivedMessage):
def __init__(self, prevent_renew_lock=False, exception_on_renew_lock=False, **kwargs):
self._lock_duration = kwargs.get("lock_duration", 2)
self._raw_amqp_message = None
self._received_timestamp_utc = utc_now()
self.locked_until_utc = self._received_timestamp_utc + timedelta(seconds=self._lock_duration)
self._settled = False
self._receiver = MockReceiver()
self._prevent_renew_lock = prevent_renew_lock
self._exception_on_renew_lock = exception_on_renew_lock
Example 2:
def test_queue_message_receive_and_delete(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:
with sb_client.get_queue_sender(servicebus_queue.name) as sender:
message = ServiceBusMessage("Receive and delete test")
sender.send_messages(message)
with sb_client.get_queue_receiver(servicebus_queue.name,
receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE) as receiver:
messages = receiver.receive_messages(max_wait_time=10)
assert len(messages) == 1
message = messages[0]
print_message(_logger, message)
with pytest.raises(ValueError):
receiver.complete_message(message)
with pytest.raises(ValueError):
receiver.abandon_message(message)
with pytest.raises(ValueError):
receiver.defer_message(message)
with pytest.raises(ValueError):
receiver.dead_letter_message(message)
with pytest.raises(ValueError):
receiver.renew_message_lock(message)
time.sleep(10)
with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
messages = receiver.receive_messages(max_wait_time=10)
for m in messages:
print_message(_logger, m)
assert len(messages) == 0
You can refer to mocks.py and test_queues.py
If you still have doubt, You can open an issue on GitHub: azure-sdk-for-python
Upvotes: 3