Reputation: 23
When I was trying to upgrade MassTransit package from version 3.2.4 to version 6.2.3 in my project, some existing code stopped working. I use ConnectHandler extension to attach message handlers to the bus after bus is already created and configured. It worked fine before, but does not seem to work anymore. The handler registered with this extension just doesn't fire. Alternative way to register handler with ReceiveEndpoint during bus configuration still seems to work fine with both MassTransit versions. Here is some simplified code:
[TestFixture]
public class When_MassTransit_handler_should_consume_published_message
{
[Test]
public void This_works_fine_with_MassTransit_3_2_4_but_never_enters_handler_with_MassTransit_6_2_3()
{
var someEvent = new SomeEvent { Data = 123 };
var tcs = new TaskCompletionSource<ConsumeContext<SomeEvent>>();
var bus = Bus.Factory.CreateUsingInMemory(config => { });
bus.Start();
bus.ConnectHandler<SomeEvent>(c => Task.Run(() => tcs.SetResult(c)));
bus.Publish(someEvent, someEvent.GetType(), context => { }).Wait();
if (!tcs.Task.Wait(5000))
Assert.Fail("Event consuming takes too long (> 5000 ms)"); // fails here for 6.2.3 because the handler never fires
Assert.That(someEvent.Data, Is.EqualTo(tcs.Task.Result.Message.Data));
bus.Stop();
}
[Test]
public void This_works_fine_with_both_MassTransit_versions()
{
var someEvent = new SomeEvent { Data = 123 };
var tcs = new TaskCompletionSource<ConsumeContext<SomeEvent>>();
var bus = Bus.Factory.CreateUsingInMemory(config =>
{
config.ReceiveEndpoint("input_queue", endpoint =>
{
endpoint.Handler<SomeEvent>(c => Task.Run(() => tcs.SetResult(c)));
});
});
bus.Start();
bus.Publish(someEvent, someEvent.GetType(), context => { }).Wait();
if (!tcs.Task.Wait(5000))
Assert.Fail("Event consuming takes too long (> 5000 ms)");
Assert.That(someEvent.Data, Is.EqualTo(tcs.Task.Result.Message.Data));
bus.Stop();
}
class SomeEvent
{
public int Data;
}
}
Was there any breaking change in MassTransit that caused this behavior? Is it really an issue with the MassTransit extension method or my code is outdated and should be adjusted to match the most recent version? What adjustments are required? Is there any alternative way to attach\detach handler to bus after bus is already created and configured?
Upvotes: 2
Views: 182
Reputation: 33278
Short answer? Yes, the in-memory transport was changed so that it behaves more like RabbitMQ. In fact, it was completely rewritten to behave the same way as RabbitMQ exchanges and queues. So in the case of ConnectHandler
, and any of the ConnectXxx
methods, it no longer creates a binding to the message exchange for the handler message type. The in-memory transport was also changed so that it cannot be shared by multiple bus instances, each bus created using the in-memory transport has its own message fabric.
To match the previous behavior, use the following code instead:
await bus.ConnectReceiveEndpoint("queue-name", x =>
{
x.Handler<SomeEvent>(...);
});
For messages sent directly to the bus address, ConnectHandler
works fine. It's only published messages that will not arrive, since the publish exchanges are no longer bound to the bus receive endpoint queue.
Upvotes: 1