Reputation: 763
I have a problem with configuring a fault consumer in my app. The problem is that consumed message is passed to an *_error_skipped
queue and it doesn't disappear entirely.
Below is a very simple example. The client app receives failed message and it disappear from a test_error
queue but it still exists on the test_error_skipped
queue.
Service project
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using GreenPipes;
using MassTransit;
using MassTransit.Util;
namespace MassTransitTest.Service
{
public class RequestModel
{
public DateTime RequestTime { get; set; }
}
class MassTransitService : IDisposable
{
private readonly IBusControl _busControl;
public MassTransitService()
{
_busControl = Bus.Factory.CreateUsingRabbitMq(configure =>
{
var host = configure.Host(new Uri("rabbitmq://localhost/mt_test"), h =>
{
h.Username("guest");
h.Password("guest");
});
configure.ReceiveEndpoint(host, "test", c =>
{
c.UseRetry(r => r.None());
c.Consumer<RequestConsumer>();
});
});
TaskUtil.Await(_busControl.StartAsync());
Console.WriteLine("bus started");
}
public void Dispose()
{
_busControl?.StopAsync().Wait();
}
}
class RequestConsumer : IConsumer<RequestModel>
{
public Task Consume(ConsumeContext<RequestModel> context)
{
Console.WriteLine($"request with message id {context.MessageId} received: {context.Message.RequestTime}");
throw new NotImplementedException();
}
}
}
Client project
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using GreenPipes;
using MassTransit;
using MassTransit.Util;
using MassTransitTest.Service;
namespace MassTransitTest.Client
{
class MassTransitClient
{
private readonly IBusControl _busControl;
public MassTransitClient()
{
_busControl = Bus.Factory.CreateUsingRabbitMq(configure =>
{
var host = configure.Host(new Uri("rabbitmq://localhost/mt_test"), h =>
{
h.Username("guest");
h.Password("guest");
}); enter code here
configure.ReceiveEndpoint(host, "test_error", c =>
{
c.Consumer<ErrorConsumer>();
});
});
TaskUtil.Await(_busControl.StartAsync());
Console.WriteLine("bus started");
}
public async Task Send()
{
Console.WriteLine("sending request");
await (await _busControl.GetSendEndpoint(new Uri("rabbitmq://localhost/mt_test/test"))).Send(new RequestModel()
{
RequestTime = DateTime.Now
});
Console.WriteLine("request sent");
}
}
class ErrorConsumer : IConsumer<Fault<RequestModel>>
{
public Task Consume(ConsumeContext<Fault<RequestModel>> context)
{
Console.WriteLine($"request with message id {context.Message.FaultedMessageId} failed. requested time: {context.Message.Message.RequestTime}");
return Task.CompletedTask;
}
}
}
I'm using .net core 2.1.1 and MassTransit 5.1.3
Upvotes: 3
Views: 11685
Reputation: 33233
To answer your question, which has several parts, first:
MassTransit receive endpoints move the message to the _error
queue when an exception is thrown by the consumer. Creating a receive endpoint on the _error
queue is not suggested, and should not be done.
If you simply want to observe if a fault occurred on the consumer, you can create a separate receive endpoint (such as fault-queue) and register your Fault<T>
consumer. MassTransit will publish a message that implements Fault<T>
, which the broker will route to your consumer via the receive endpoint.
However, based on your example above, you're sending a request and expecting the client to know if a fault occurred. For this, I'd recommend using the request client - which sets up the message headers to return faults back to the request originator. It also allows responses to be sent. If you don't want to wait for the response, or wait to see if the fault occurred, the above fault observer is your best option.
You can see how to use the request client in the documentation.
Upvotes: 8