EGM
EGM

Reputation: 13

Trying to get the message from the MassTransit consumer

I'm using Masstransit and RabbitMQ to publish events(without a consumer, only using a publisher), and at the moment i'm trying to create a integration test to verify that the message has been published, and if so, i want to check if it is the right message. To do that i create a consumer to consume the message from the queue and compare it to what i expected. The problem here is, i cant consume the message. The event is successfully published but im not able to get the message.

This is the class responsible for connecting the Consumer

public class ServiceBusHelper
{

    private IBusControl bus;
    private readonly string serviceBusQueueName = ConfigurationManager.AppSettings["ServiceBusQueuename"];
    private readonly string serviceBusEndpoint = ConfigurationManager.AppSettings["ServiceBusEndPoint"];
    private readonly string serviceBusUsername = ConfigurationManager.AppSettings["ServiceBusUsername"];
    private readonly string serviceBusPassword = ConfigurationManager.AppSettings["ServiceBusPassword"];

    public ConnectHandle HandleObserver { get; set; }

    public void ConnectRabbitMQ()
    {
        bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(
                new Uri(serviceBusEndpoint),
                h =>
                {
                    h.Username(serviceBusUsername);
                    h.Password(serviceBusPassword);
                });

            cfg.ReceiveEndpoint(
            serviceBusQueueName,
            e =>
            {

                e.Consumer<ServiceBusEventsHelper>();

            });
        });

        //Observer
        var observer = new PublishedObserverHelper();
        HandleObserver = bus.ConnectPublishObserver(observer);

    }


}

and this is the class that will consume the messages

public class ServiceBusEventsHelper : IConsumer<ITransportCreatedEvent>
{
    public ITransportCreatedEvent Result { get; set; }


    public async Task Consume(ConsumeContext<ITransportCreatedEvent> context)
    {
        Result = await Task.FromResult(context.Message);

    }


}

and in the test method i have this

            ServiceBusEventsHelper eventHelper = new ServiceBusEventsHelper();
        ServiceBusHelper busHelper = new ServiceBusHelper();

        try
        {
            busHelper.ConnectRabbitMQ();
            transportResponseDto = await this.transportClient.CreateTransportAsync(transportRequest);
            handle = busHelper.HandleObserver;

            var eventResponse = eventHelper.Result;// Allways NULL
        }
        catch (Exception ex)
        {
            Assert.Fail(ex.GetDetailMessage());
        }

i'm trying to get the message result like this

var eventResponse = eventHelper.Result;// Allways NULL

but is allways null.

Can somebody help me please??

I have a service, and one of the methods is the CreateTransportAsync(), and inside that method i call the Publish

public async Task<TransportResponseDto> CreateTransportAsync(TransportDto request){

    .
    .
    .
    await this.RaiseTransportCreatedEvent(transportResponseDto);
    }

    private async Task RaiseTransportCreatedEvent(TransportResponseDto transportResponseDto)
    {
        var evt = CreateTransportEvent(transportResponseDto);
        await this.serviceBus.Publish(evt).ConfigureAwait(false);
    }

public class ServiceBus<T> : IServiceBus<T> where T : class
{
    private readonly IBus bus;

    public ServiceBus(IBus bus)
    {
        this.bus = bus;
    }

    public Task Publish(T evt)
    {
        return bus.Publish(evt, evt.GetType());
    }
}

This is how i publish the event, and it works. Now i'm trying to test if all this work with an integration test in another solution, where i'm trying to create a consumer to consume the message from the queue. Then, i want to verify that message(comparing it with a fake message in json file) to see if everything is ok. The problem is i'm not able to get that message and i cant understand what is happening. P.S : I dont really understand what you are trying to say in point 3. Thanks

Upvotes: 1

Views: 6249

Answers (1)

Alexey Zimarev
Alexey Zimarev

Reputation: 19640

  1. You need to start the bus by calling bus.Start(). You do not do this so nothing is received anyway.
  2. It is unclear what transportClient.CreateTransportAsync does. Who is publishing the message?
  3. Consumer is instantiated per consumed message. What you do in your "test" - you instantiate an instance of the consumer and keep reference to that instance. Then you send a message somewhere. MassTransit creates a new instance of your consumer, you get the field updated, then the instance gets disposed. But you are checking the Result of the instance you have originally created, which is never receiving any message. It will always be null.
  4. Delivering message from publisher to consumer takes time. You are trying to check the result straight after you initialise the bus. I am quite sure you will never get it so fast, even if you fix (1), (2) and (3)

I am not sure what exactly are you trying to test. Publishing and consuming messages work over all transports with MassTransit. You can get any sample from Github, build it, run it and see it working.

There are also a number of tests for the RabbitMQ transport that show how you can create something like this. For example, check the ConsumerBind_Specs.cs file.

In addition, if you want to use some existing consumer instance, you can connect this instance to the bus as described in the documentation Connecting an existing consumer instance. Using e.Instance instead of e.Consumer will make your test work of you do proper waiting for the Consume method to finish. However, this is not really popular approach since you really want to limit your consumer scope to one message handling only.

Upvotes: 2

Related Questions