Reputation: 48
I have a simple question but I can't find an answer for that:
In a RabbitMQ queue, with multiple consumers and a single publisher, is it possible for the publisher to stop sending messages to the other consumers until the first one process the message and send the ack to the publisher?
What I want to achieve:
Queue -> Q1
Sender -> S1
Consumers -> C1 - C2
No matter what I tried (single active consumer | reject and nack instead of ack | BasicGet instead of BasicConsume), it doesn't seem to do what I have in mind.
Am I trying to achieve something that is impossible for RabbitMQ or I'm doing something wrong?
Code of the sender:
public static ConnectionFactory _FACTORY = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost" };
public static IDictionary<string, object> _ARGUMENTS = new Dictionary<string, object>();
public static void Main(string[] args)
{
IConnection connection = _FACTORY.CreateConnection();
IModel channel = connection.CreateModel();
SendMessagesQueue(channel);
channel.Close();
connection.Close();
}
public static void SendMessagesQueue(IModel channel)
{
Console.WriteLine("QUEUE");
int i = 0;
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicQos(0,2,true);
while (Console.ReadLine() != null)
{
i++;
var message = DateTime.Now + " - " + i;
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", "queue", properties, body);
Console.WriteLine(" [x] Sent {0}", message);
}
}
Code of the receivers (they are the same):
public static ConnectionFactory _FACTORY = new ConnectionFactory() { HostName = "localhost" };
public static IDictionary<string, object> _ARGUMENTS = new Dictionary<string, object>();
public static int id = 0;
public static IConnection connection = null;
public static async Task Main(string[] args)
{
IConnection connection = _FACTORY.CreateConnection();
IModel channel = connection.CreateModel();
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
ReceiveMessagesQueue(channel, consumer);
channel.Close();
connection.Close();
}
public static void ReceiveMessagesQueue(IModel channel, EventingBasicConsumer consumer)
{
Console.WriteLine("QUEUE");
while (Console.ReadLine() != null)
{
Console.WriteLine(" [*] Waiting for messages.");
consumer.Received += (sender, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("RECEIVED " + message + " - SLOW - AT " + DateTime.Now);
Thread.Sleep(200000000);//remove it for the fast ack
channel.BasicAck(ea.DeliveryTag, false);
Console.WriteLine("ACK SENT " + DateTime.Now);
Console.WriteLine("\n");
};
channel.BasicGet("queue", false);
};
}
Upvotes: 0
Views: 688
Reputation: 9627
is it possible for the publisher to stop sending messages to the other consumers until the first one process the message and send the ack to the publisher?
Yes. Here is one way:
If you need further assistance, provide a git repository with your complete code and open a discussion here where we can continue work on this:
https://github.com/rabbitmq/rabbitmq-dotnet-client/discussions
Upvotes: 1