Reputation: 25
I'm trying to write unit tests using MassTransit. When looking online, I found the best way to access the Bus would be creating it using an InMemoryTestHarness
. I add my consumers and a PublishObserver
to get the resulting behavior.
In the example below, I send a TestRequest
message to the bus, then my consumer reads the request and puts a TestResponse
message back on the bus. Finally, an observer gets the response.
I don't know if the problem is with some configuration I'm missing, or if there is some task I'm not waiting, but the request message never even arrives at the consumer.
What am I missing?
The test
[TestMethod]
public void RequestResponseBusTest()
{
var harness = new InMemoryTestHarness();
var consumer = new TestConsumer();
harness.OnConfigureInMemoryBus += c =>
{
c.ReceiveEndpoint("testqueue", e =>
e.Consumer(() => consumer));
};
var observer = new TestPublishObserver();
harness.OnConnectObservers += c =>
{
c.ConnectPublishObserver(observer);
};
harness.Start().Wait();
var bus = harness.Bus;
bus.Publish(new TestRequest() { X = 99 }).Wait();
Assert.AreEqual(1, consumer.ConsumedMessages.Count, "consumed");
Assert.AreEqual(1, observer.PublishedRequests.Count, "requests");
Assert.AreEqual(1, observer.PublishedResponses.Count, "responses");
}
And supporting classes
[Serializable]
public class TestRequest
{
public int X { get; set; }
}
[Serializable]
public class TestResponse
{
public int Y { get; set; }
}
public class TestConsumer : IConsumer<TestRequest>
{
public List<TestRequest> ConsumedMessages { get; } = new List<TestRequest>();
public Task Consume(ConsumeContext<TestRequest> context)
{
ConsumedMessages.Add(context.Message);
context.Publish(new TestResponse() { Y = 123 }).Wait();
return Task.CompletedTask;
}
}
private class TestPublishObserver : IPublishObserver
{
public List<TestRequest> PublishedRequests { get; } = new List<TestRequest>();
public List<TestResponse> PublishedResponses { get; } = new List<TestResponse>();
public Task PrePublish<T>(PublishContext<T> context) where T : class
{
return Task.CompletedTask;
}
public Task PostPublish<T>(PublishContext<T> context) where T : class
{
var msg = context.Message;
if (msg is TestRequest)
PublishedRequests.Add((TestRequest)(object)msg);
if (msg is TestResponse)
PublishedResponses.Add((TestResponse)(object)msg);
return Task.CompletedTask;
}
public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class
{
return Task.CompletedTask;
}
}
Upvotes: 0
Views: 3284
Reputation: 984
you need to add Thread.Sleep(2000) after bus.Publish(new TestRequest() { X = 99 }).Wait(); bus.Publish does not guarantee message delivery. When you call the Wait() method, you simply wait for it to be sent, not processed
OR!!!
[TestMethod]
public void RequestResponseBusTest()
{
var harness = new InMemoryTestHarness();
var consumer = new TestConsumer();
harness.OnConfigureInMemoryBus += c =>
{
c.ReceiveEndpoint("testqueue", e =>
e.Consumer(() => consumer));
};
var observer = new TestPublishObserver();
harness.OnConnectObservers += c =>
{
c.ConnectPublishObserver(observer);
};
harness.Start().Wait();
var bus = harness.Bus;
bus.Publish(new TestRequest() { X = 99 }).Wait();
//add this line
var receivedMessage = harness.Consumed.Select<TestRequest>().FirstOrDefault();
Assert.AreEqual(1, consumer.ConsumedMessages.Count, "consumed");
Assert.AreEqual(1, observer.PublishedRequests.Count, "requests");
Assert.AreEqual(1, observer.PublishedResponses.Count, "responses");
}
Upvotes: 1