Reputation: 873
I have an Azure Function which is an EventHub Trigger. The events are being processed in a batch EventData[]
.
[FunctionName("EventHubTriggerCSharp")]
public async Task Run([EventHubTrigger("samples-workitems", Connection = "EventHubConnectionAppSetting")] EventData[] eventHubMessages, ILogger log)
{
foreach (var message in eventHubMessages)
{
log.LogInformation($"C# function triggered to process a message: {Encoding.UTF8.GetString(message.Body)}");
log.LogInformation($"EnqueuedTimeUtc={message.SystemProperties.EnqueuedTimeUtc}");
await service.CallAsync(message);
}
}
How can one unit test a Function that requires a EventData[]
as the payload?
Upvotes: 1
Views: 3088
Reputation: 3293
You should be able to construct the array of EventData
and pass that to the Run
method along with a mock logger instance.
I have used MsTest but it should be the same logic with NUnit.
[TestClass]
public class EventHubTriggerCSharpTests
{
private readonly Mock<IService> _mockService = new Mock<IService>();
private readonly Mock<ILogger> _mockLogger = new Mock<ILogger>();
private EventHubTriggerCSharp _eventHubTriggerCSharp;
[TestInitialize]
public void Initialize()
{
_eventHubTriggerCSharp = new EventHubTriggerCSharp(_mockService.Object);
}
[TestMethod]
public async Task WhenEventHubTriggersFunction_ThenCallAsyncWithEventHubMessage()
{
// arrange
var data = "some data";
var eventHubMessages = new List<EventData>
{
new EventData(Encoding.UTF8.GetBytes(data))
{
SystemProperties = new EventData.SystemPropertiesCollection(1, DateTime.Now, "offset", "partitionKey")
}
};
// act
await _eventHubTriggerCSharp.Run(eventHubMessages.ToArray(), _mockLogger.Object);
// assert
_mockService.Verify(x => x.CallAsync(eventHubMessages[0]));
}
}
Note: I haven't actually run the test due to an issue with my Visual Studio but hopefully you get the idea.
Upvotes: 1
Reputation: 2440
Yes, with the below example you can use unit test the Azure Functions.
Here is the example Test class for the unit test of Azure function.
{
"id": 1,
"CustomerName": "Test User",
"Country": "India",
"JoiningDate": "28-09-2020",
"PrimeUser": false
}
from unittest.mock import MagicMock
from unittest.mock import patch
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.eventhub import PartitionContext, EventData
import unittest
class TestStringMethods(unittest.TestCase):
@patch('azure.eventhub.EventHubConsumerClient.from_connection_string')
@patch('azure.eventhub.extensions.checkpointstoreblob.BlobCheckpointStore.from_connection_string')
def test_eventhub_receive_events(self, mock_blob_checkpoint_store, mock_event_hub_consumer_client):
mock_event_hub_consumer_client.return_value = MagicMock()
mock_blob_checkpoint_store.return_value = MagicMock()
def receive(on_event, **kwargs):
partition_context = MagicMock()
partition_context.update_checkpoint = MagicMock()
on_event(partition_context, EventData("test for Mock"))
mock_event_hub_consumer_client.receive = receive
with self.assertLogs('myprogram', level='INFO') as assert_execute:
receive_events(mock_event_hub_consumer_client)
self.assertIn("Accepting the event", assert_execute.output[0])
if __name__ == '__main__':
unittest.main()
Upvotes: 0