Reputation: 3
MassTransit.3.1.2 MassTransit.Autofac.3.1.1 MassTransit.RabbitMQ.3.1.1 RabbitMQ.Client.3.6.0 Topshelf.3.3.1
One Topshelf Windows service, create a bus instance like this:
var builder = new ContainerBuilder();
builder.RegisterConsumers(Assembly.GetExecutingAssembly());
builder.Register<IBusControl>(context =>
{
return Bus.Factory.CreateUsingRabbitMq(rbmq =>
{
var host = rbmq.Host(new Uri("rabbitmq://" + BusConfig.Instance.Host + ":" + BusConfig.Instance.Port + "/" + BusConfig.Instance.VHost), h =>
{
h.Username(BusConfig.Instance.UserName);
h.Password(BusConfig.Instance.Password);
});
rbmq.UseJsonSerializer();
rbmq.UseNLog();
rbmq.ReceiveEndpoint(BusConfig.Instance.Queue, edp =>
{
edp.UseRetry(Retry.Incremental(5, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5)));
edp.LoadFrom(context);
});
});
}).SingleInstance().As<IBusControl>().As<IBus>();
return builder.Build().Resolve<IBusControl>();
One console application like this :
var bus = Bus.Factory.CreateUsingRabbitMq(rbmq =>
{
var host = rbmq.Host(new Uri("rabbitmq://" + BusConfig.Instance.Host + ":" + BusConfig.Instance.Port + "/" + BusConfig.Instance.VHost), h =>
{
h.Username(BusConfig.Instance.UserName);
h.Password(BusConfig.Instance.Password);
});
});
bus.Start();
var msg = new OrderCreatedMessage() { OrderId = 10102 };
bus.Publish<OrderCreatedMessage>(msg);
When mesasge published by console application ,Rabbitmq queue named by “BusConfig.Instance.Queue” received the OrderCreatedMessage:
Rabbitmq queue view when message published
Then i started the topshelf service ,The OrderCreatedMessage auto removed to _skipped queue :
Rabbitmq queue view when topshelf service started
Masstransit log like this:
MOVE:rabbitmq:\/\/192.168.12.217:5672\/zst\/zst.order.queue?prefetch=8:rabbitmq:\/\/192.168.12.217:5672\/zst\/zst.order.queue_skipped?bind=true&queue=zst.order.queue_skipped:N\/A:Moved
But,When i publish message and consume the message with the same bus (Topshelf service), it works!!!
Any help or additional insight on this architecture would be appreciated!
Upvotes: 0
Views: 4829
Reputation: 14436
I guess its having problems creating the consumer for OrderCreatedMessage
message.
Have you tried to just resolve the consumer on its own within the builder?
var test = builder.Resolve<OrderCreatedMessageConsumer>();
Upvotes: 1
Reputation: 3546
I'm just getting started with Masstransit and RabbitMQ. The answers above did send me in the right direction but for future reference. I was getting extra queues in RabbitMQ because the exhange had incorrect (duplicate bindings on) and I didn't notice this since the code seemed correct but the incorrect settings in RabbitMQ persisted. Deleting the queues and starting fresh solved it.
Upvotes: 1
Reputation: 33233
Look at the bindings between the exchange for the published message type, and the exchange for the input queue of the service endpoint. Make sure that the proper type exchanges are bound correctly. Since the message is being delivered, I'm guessing that part is correct.
For the receive endpoint, it seems like the consumer was correct at one point (which explains why the binding exists) but is perhaps not consuming the correct message type currently. The message type must be the same message contract in the consumer and the publisher for the message to be consumed by the consumer.
When the message is moved to _skipped, there is no consumer on that endpoint actually consuming the message types in the message itself. I'd recommend posting the following output for review:
bus.GetProbeResult().ToJsonString()
This will show the consumers that are being registered and the message types which are being consumed. It will also help immensely in troubleshooting the issue you're seeing.
Upvotes: 5