Reputation: 61
I am working with Masstransit, RabbitMq and .NET core. I have problem when I want to publish message and receive. My UserEventHandler
and UserCommandHandler
class never calls. Can anybody help me? Bus is started. I also have a situation when I remove UserEventHandler
class from registration my breakpoint stops on UserCommandHandler
class.
Here is my UserEventHandler
class:
public class UserEventHandler : IConsumer<IUserEvent>
{
private ConcertDbContext _context;
public UserEventHandler(ConcertDbContext context)
{
_context = context;
}
public Task Consume(ConsumeContext<IUserEvent> context)
{
var concerts = _context.Concerts.Where(c => c.Name == context.Message.Name);
foreach (var concert in concerts)
{
concert.SoldTickets = context.Message.TicketBuyed;
_context.Concerts.Update(concert);
_context.SaveChanges();
}
throw new NotImplementedException();
}
}
Here is my UserCommandHandler class:
public class UserCommandHandler : IConsumer<IUserCommand>
{
public async Task Consume(ConsumeContext<IUserCommand> context)
{
await context.Publish<IUserEvent>(new UserEvent(context.Message.Name, context.Message.TicketBuyed));
}
}
I registered UserCommandHandler here in Startup.cs class:
services.AddMassTransit(x =>
{
x.AddConsumer<ConcertEventHandler>();
x.AddConsumer<UserCommandHandler>();
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri),
hst =>
{
hst.Username(RabbitMqConstants.Username);
hst.Password(RabbitMqConstants.Password);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.CreateConcertService, e =>
{
e.PrefetchCount = 16;
e.ConfigureConsumer<ConcertEventHandler>(provider);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.BuyCncertTicketService, e =>
{
e.PrefetchCount = 16;
e.ConfigureConsumer<UserCommandHandler>(provider);
});
// or, configure the endpoints by convention
cfg.ConfigureEndpoints(provider);
}));
Here is I registered UserEventHandler class
services.AddMassTransit(x =>
{
x.AddConsumer<ConcertCommandHandler>();
x.AddConsumer<UserEventHandler >();
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri),
hst =>
{
hst.Username(RabbitMqConstants.Username);
hst.Password(RabbitMqConstants.Password);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.CreateConcertService, e =>
{
e.PrefetchCount = 16;
e.ConfigureConsumer<ConcertCommandHandler>(provider);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.BuyCncertTicketService, e =>
{
e.PrefetchCount = 16;
e.ConfigureConsumer<UserEventHandler>(provider);
});
// or, configure the endpoints by convention
cfg.ConfigureEndpoints(provider);
}));
Upvotes: 1
Views: 1345
Reputation: 33278
Each service should be using separate queues, currently you are configuring each service with separate consumers on the same receive endpoint queue names. That will cause the messages to be moved to the _skipped queue since there is not a consumer on that service instance for the published message.
Also, remove the cfg.ConfigureEndpoints
at the end of each service configuration. You are manually configuring your receive endpoints, so it will create duplicate queues and endpoints for your consumers.
Upvotes: 1