Reputation: 375
I have a simple RabbitMQ publisher and consumer code listed below.
First, I created 10 count of different message in My_Tasks queue. When I try to get these message, one by one and with autoAck flag as false, I can read the first message, but acknowledge could not be sent to the RabbitMQ server. I get an error written below;
Publisher;
var qName = "My_Tasks";
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(qName, durable: true, false, false, null);
var body = Encoding.UTF8.GetBytes(message);
var prop = channel.CreateBasicProperties();
prop.Persistent = true;
channel.BasicPublish("", routingKey: qName, prop, body);
}
}
Consumer;
var qName = "My_Tasks";
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(qName, durable: true, false, false, null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(qName, autoAck: false, consumer);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
channel.BasicAck(ea.DeliveryTag, multiple: false);
};
}
}
RabbitMQ.Client.Exceptions.AlreadyClosedException: 'Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Application, code=200, text='Goodbye', classId=0, methodId=0'
at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd) at RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemory 1 body) at RabbitMQ.Client.Framing.Impl.Model.BasicAck(UInt64 deliveryTag, Boolean multiple) at RabbitMQ.Client.Impl.RecoveryAwareModel.BasicAck(UInt64 deliveryTag, Boolean multiple) at RabbitMQ.Client.Impl.AutorecoveringModel.BasicAck(UInt64 deliveryTag, Boolean multiple) at QueueExample.Consumer.Program.<>c__DisplayClass0_0.b__0(Object model, BasicDeliverEventArgs ea) in D:\Projects\RabbitMQTutorial\QueueExample\QueueExample.Consumer\Program.cs:line 36 at RabbitMQ.Client.Events.EventingBasicConsumer.HandleBasicDeliver(String consumerTag, UInt64 deliveryTag, Boolean redelivered, String exchange, String routingKey, IBasicProperties properties, ReadOnlyMemory`1 body) at RabbitMQ.Client.Impl.ConcurrentConsumerDispatcher.<>c__DisplayClass10_0.b__0()
Thanks for your help
Upvotes: 5
Views: 11668
Reputation: 375
I found the answer of the error. My example application is an consoleApp so while consumer receive the message the connection was already closed. When a write a Connsole.ReadLine() for waiting response for closing the connection, all the message could read by the consumer.
That is the temporary solution, you can find your own solution from this perspective.
var qName = "My_Tasks";
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(qName, durable: true, false, false, null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(qName, autoAck: false, consumer);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
channel.BasicAck(ea.DeliveryTag, multiple: false);
};
//Solution is here
Console.WriteLine("Press Any Key to Continue..");
Console.ReadLine();
}
}
Upvotes: 3
Reputation: 345
This should be a comment, but I am missing reputation.
Was your solution already working? Where do you define your connection to the broker? Are you using the broker in a docker-compose configuration? In this case, your broker must connect to
rabbitmq://host.docker.internal
When I started working with RabbitMQ in .Net Core I experienced the same issues. As my application (microservice pattern) was in an early stage I switched to MassTransit framework which handles everything. I can highly recommend it, since it "just works" and you can fully focus on your desired system functionality.
Upvotes: 1