tsaebeht
tsaebeht

Reputation: 1680

How to mock Azure ServiceBus Receiver in Python MagicMock

I'm using the Python Azure ServiceBus package and have the following snippet of code:

# receiver.py

import logging
logger = logging.getLogger("test")

def receive_message(
        connection_str,
        queue_name
) -> None:
    """
    Call Azure ServiceBus API to retrieve message from a queue
    """
    with ServiceBusClient.from_connection_string(
            connection_str, logging_enable=True
    ) as servicebus_client:

        with servicebus_client.get_queue_receiver(
                queue_name=queue_name,
                max_wait_time=20,
                receive_mode=ServiceBusReceiveMode.PEEK_LOCK,
        ) as receiver:
            for message in receiver:
                logger.debug(f"Received message {message}")

I'm attempting to write unit tests for this function, and want to be able to mock out the recevier. Here is my attempt at writing the unit test, which fails because I can't figure out how to get the test to enter the for message in receiver block.

# test_receiver.py

@patch("receiver.ServiceBusClient")
@patch("receiver.logger")
def test_receive_message(mock_logger, mock_svcbus_client):

    # Figure out how to mock
    mock_svcbus_client.from_connection_string.return_value.get_queue_receiver.return_value = iter(["message"])


    receive_message("mock_connection_str", "mock_q_name")
    
    # Assertion fails
    mock_logger.return_value.debug.assert_called_once()

Upvotes: 5

Views: 1934

Answers (2)

Péter Szilvási
Péter Szilvási

Reputation: 2009

It is possible to mock the receiver that is returned from the get_queue_receiver method. I have an example for the async version of the service bus client but it is easy to adapt it to a synchronized version if necessary. Therefore, install the following packages:

pip install azure-servicebus pytest pytest-asyncio

First, you need to inject the ServiceBusClient instance to mock it in the test code. Create a class where the constructor takes the ServiceBusClient instance:

# azure_servicebus.py
from azure.servicebus.aio import ServiceBusClient


class AzureServiceBus():
    def __init__(self, client: ServiceBusClient) -> None:
        self._client = client

    async def receive_events_async(self, queue_name: str) -> None:
        async with self._client.get_queue_receiver(queue_name) as receiver:
            messages = await receiver.receive_messages()
            for message in messages:
                print(str(message))
                await receiver.complete_message(message)

In the test file, the AsyncMock class and the @pytest.mark.asyncio decorator can be used to test the async/await code. Our focus is to patch the get_queue_receiver statement:

# azure_servicebus_test.py
from unittest.mock import AsyncMock, patch

import pytest
from azure.servicebus.aio import ServiceBusClient
from module.path.for.azure_servicebus import AzureServiceBus


@pytest.mark.asyncio
async def test_receive_message():
    mock_receiver = AsyncMock()
    mock_receiver.__aenter__.return_value = mock_receiver
    mock_receiver.__aexit__.return_value = False
    mock_receiver.receive_messages.return_value= ["message 1", "message 2"]
    mock_receiver.complete_message.return_value = None

    with patch("azure.servicebus.aio.ServiceBusClient.get_queue_receiver", return_value=mock_receiver):
        connstr = "Endpoint=sb://service-bus-hostname-test.servicebus.windows.net/;SharedAccessKeyName=shared-access-key-test;SharedAccessKey=sharedaccesskeytest=;EntityPath=topic-path-test"
        async with ServiceBusClient.from_connection_string(connstr) as client:
            service_bus = AzureServiceBus(client)
            await service_bus.receive_events_async("my_queue_name")

The __aenter__ and the __aexit__ methods need to be set because we are using the async context manager. Specifically, the async with will invoke these methods.

Another important part is the form of the connection string. If it is not well-formed, then the initialization will throw an exception. You don't need a valid connection string but a correctly formed one.

The output of the test should look like this:

$ pytest <path_to_test>/azure_servicebus_test.py -vs
=================================================================================================== test session starts ====================================================================================================
platform win32 -- Python 3.10.11, pytest-7.4.4, pluggy-1.5.0 -- C:\<path-to-virtual_env>\.venv\Scripts\python.exe
cachedir: .pytest_cache
rootdir: C:\<path_to_root_dir>
configfile: pyproject.toml
plugins: anyio-4.4.0, asyncio-0.23.7, azurepipelines-1.0.5, cov-4.1.0, mock-3.14.0, nunit-1.0.7
asyncio: mode=strict
collected 1 item

<path_to_test>/azure_servicebus_test.py::test_receive_message message 1
message 2
PASSED

The two messages are received from the mocked receiver instance.

Note: For the synchronized version, you do not need the pytest-asyncio plugin, the decorator, and the async/await keywords. Moreover, use the MagicMock instead of AsyncMock class, and set the synchronized context manager (namely the __enter__ and the __exit__ methods).

Upvotes: 1

Ecstasy
Ecstasy

Reputation: 1864

You can try from mocks import MockReceivedMessage, MockReceiver to mock the receiver

Example 1:

class MockReceivedMessage(ServiceBusReceivedMessage):
    def __init__(self, prevent_renew_lock=False, exception_on_renew_lock=False, **kwargs):
        self._lock_duration = kwargs.get("lock_duration", 2)
        self._raw_amqp_message = None
        self._received_timestamp_utc = utc_now()
        self.locked_until_utc = self._received_timestamp_utc + timedelta(seconds=self._lock_duration)
        self._settled = False
        self._receiver = MockReceiver()

        self._prevent_renew_lock = prevent_renew_lock
        self._exception_on_renew_lock = exception_on_renew_lock

Example 2:

 def test_queue_message_receive_and_delete(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
        
        with ServiceBusClient.from_connection_string(
            servicebus_namespace_connection_string, logging_enable=False) as sb_client:
                
            with sb_client.get_queue_sender(servicebus_queue.name) as sender:
                message = ServiceBusMessage("Receive and delete test")
                sender.send_messages(message)
    
            with sb_client.get_queue_receiver(servicebus_queue.name,
                                                 receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE) as receiver:
                messages = receiver.receive_messages(max_wait_time=10)
                assert len(messages) == 1
                message = messages[0]
                print_message(_logger, message)
                with pytest.raises(ValueError):
                    receiver.complete_message(message)
                with pytest.raises(ValueError):
                    receiver.abandon_message(message)
                with pytest.raises(ValueError):
                    receiver.defer_message(message)
                with pytest.raises(ValueError):
                    receiver.dead_letter_message(message)
                with pytest.raises(ValueError):
                    receiver.renew_message_lock(message)
    
            time.sleep(10)
    
            with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
                messages = receiver.receive_messages(max_wait_time=10)
                for m in messages:
                    print_message(_logger, m)
                assert len(messages) == 0

You can refer to mocks.py and test_queues.py

If you still have doubt, You can open an issue on GitHub: azure-sdk-for-python

Upvotes: 3

Related Questions