Reputation: 1235
I use RabbitMQ as my queue message server, I use .NET C# client. When there is error in processing message from queue, message will not ackknowleage and still stuck in queue not be processed again as the document I understand.
I don't know if I miss some configurations or block of codes.
My idea now is auto manual ack the message if error and manual push this message to queue again.
I hope to have another better solution.
Thank you so much.
my code
public void Subscribe(string queueName)
{
while (!Cancelled)
{
try
{
if (subscription == null)
{
try
{
//try to open connection
connection = connectionFactory.CreateConnection();
}
catch (BrokerUnreachableException ex)
{
//You probably want to log the error and cancel after N tries,
//otherwise start the loop over to try to connect again after a second or so.
log.Error(ex);
continue;
}
//crate chanel
channel = connection.CreateModel();
// This instructs the channel not to prefetch more than one message
channel.BasicQos(0, 1, false);
// Create a new, durable exchange
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);
// Create a new, durable queue
channel.QueueDeclare(queueName, true, false, false, null);
// Bind the queue to the exchange
channel.QueueBind(queueName, exchangeName, queueName);
//create subscription
subscription = new Subscription(channel, queueName, false);
}
BasicDeliverEventArgs eventArgs;
var gotMessage = subscription.Next(250, out eventArgs);//250 millisecond
if (gotMessage)
{
if (eventArgs == null)
{
//This means the connection is closed.
DisposeAllConnectionObjects();
continue;//move to new iterate
}
//process message
channel.BasicAck(eventArgs.DeliveryTag, false);
}
}
catch (OperationInterruptedException ex)
{
log.Error(ex);
DisposeAllConnectionObjects();
}
}
DisposeAllConnectionObjects();
}
private void DisposeAllConnectionObjects()
{
//dispose subscription
if (subscription != null)
{
//IDisposable is implemented explicitly for some reason.
((IDisposable)subscription).Dispose();
subscription = null;
}
//dipose channel
if (channel != null)
{
channel.Dispose();
channel = null;
}
//check if connection is not null and dispose it
if (connection != null)
{
try
{
connection.Dispose();
}
catch (EndOfStreamException ex)
{
log.Error(ex);
}
catch (OperationInterruptedException ex)//handle this get error from dispose connection
{
log.Error(ex);
}
catch (Exception ex)
{
log.Error(ex);
}
connection = null;
}
}
Upvotes: 1
Views: 7610
Reputation: 141
I think you may have misunderstood the RabbitMQ documentation. If a message does not get ack'ed from the consumer, the Rabbit broker will requeue the message onto the queue for consumption. I don't believe your suggested method for ack'ing and then requeuing a message is a good idea, and will just make the problem more complex.
If you want to explicitly "reject" a message because the consumer had a problem processing it, you could use the Nack feature of Rabbit.
For example, within your catch exception blocks, you could use:
subscription.Model.BasicNack(eventArgs.DeliveryTag, false, true);
This will inform the Rabbit broker to requeue the message. Basically, you pass the delivery tag, false to say it is not multiple messages, and true to requeue the message. If you want to reject the message and NOT requeue, just change true to false.
Additionally, you have created a subscription, so I think you should perform your ack's directly on this, not through the channel.
Change:
channel.BasicAck(eventArgs.DeliveryTag, false);
To:
subscription.Ack();
This method of ack'ing is much cleaner since you are then keeping everything subscription-related on the subscription object, rather than messing around with the channel that you've already subscribed to.
Upvotes: 10