DraganMilovac93
DraganMilovac93

Reputation: 61

Can not receive message with Masstransit in .NetCore

I am working with Masstransit, RabbitMq and .NET core. I have problem when I want to publish message and receive. My UserEventHandler and UserCommandHandler class never calls. Can anybody help me? Bus is started. I also have a situation when I remove UserEventHandler class from registration my breakpoint stops on UserCommandHandler class.

Here is my UserEventHandler class:

public class UserEventHandler : IConsumer<IUserEvent>
{
    private ConcertDbContext _context;
    public UserEventHandler(ConcertDbContext context)
    {
        _context = context;
    }
    public Task Consume(ConsumeContext<IUserEvent> context)
    {
        var concerts = _context.Concerts.Where(c => c.Name == context.Message.Name);
        foreach (var concert in concerts)
        {
            concert.SoldTickets = context.Message.TicketBuyed;
            _context.Concerts.Update(concert);
            _context.SaveChanges();
        }
        throw new NotImplementedException();
    }
}

Here is my UserCommandHandler class:

public class UserCommandHandler : IConsumer<IUserCommand>
{
    public async Task Consume(ConsumeContext<IUserCommand> context)
    {
        await context.Publish<IUserEvent>(new UserEvent(context.Message.Name, context.Message.TicketBuyed));
    }
}

I registered UserCommandHandler here in Startup.cs class:

        services.AddMassTransit(x =>
        {
            x.AddConsumer<ConcertEventHandler>();
            x.AddConsumer<UserCommandHandler>();
            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri),
                    hst =>
                    {
                        hst.Username(RabbitMqConstants.Username);
                        hst.Password(RabbitMqConstants.Password);
                    });

                cfg.ReceiveEndpoint(host, RabbitMqConstants.CreateConcertService, e =>
                {
                    e.PrefetchCount = 16;

                    e.ConfigureConsumer<ConcertEventHandler>(provider);
                });

                cfg.ReceiveEndpoint(host, RabbitMqConstants.BuyCncertTicketService, e =>
                {
                    e.PrefetchCount = 16;

                    e.ConfigureConsumer<UserCommandHandler>(provider);
                });

                // or, configure the endpoints by convention
                cfg.ConfigureEndpoints(provider);
            }));

Here is I registered UserEventHandler class

                services.AddMassTransit(x =>
        {
            x.AddConsumer<ConcertCommandHandler>();
            x.AddConsumer<UserEventHandler >();
            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri),
                    hst =>
                    {
                        hst.Username(RabbitMqConstants.Username);
                        hst.Password(RabbitMqConstants.Password);
                    });

                cfg.ReceiveEndpoint(host, RabbitMqConstants.CreateConcertService, e =>
                {
                    e.PrefetchCount = 16;

                    e.ConfigureConsumer<ConcertCommandHandler>(provider);
                });
                cfg.ReceiveEndpoint(host, RabbitMqConstants.BuyCncertTicketService, e =>
                {
                    e.PrefetchCount = 16;

                    e.ConfigureConsumer<UserEventHandler>(provider);
                });

                // or, configure the endpoints by convention
                cfg.ConfigureEndpoints(provider);
            }));

Upvotes: 1

Views: 1345

Answers (1)

Chris Patterson
Chris Patterson

Reputation: 33278

Each service should be using separate queues, currently you are configuring each service with separate consumers on the same receive endpoint queue names. That will cause the messages to be moved to the _skipped queue since there is not a consumer on that service instance for the published message.

Also, remove the cfg.ConfigureEndpoints at the end of each service configuration. You are manually configuring your receive endpoints, so it will create duplicate queues and endpoints for your consumers.

Upvotes: 1

Related Questions