Reputation: 6462
I run the producer, it generates N messages, i see them on the dashboard. When I run a receiver it receive all messages from the queue and the queue is an empty.
static void Receive(string QueName)
{
ConnectionFactory connectionFactory = new ConnectionFactory
{
HostName = HostName,
UserName = UserName,
Password = Password,
};
var connection = connectionFactory.CreateConnection();
var channel = connection.CreateModel();
channel.BasicQos(0, 1, false);
MessageReceiver messageReceiver = new MessageReceiver(channel);
channel.BasicConsume(QueName, false, messageReceiver);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
// Receiver
public class MessageReceiver : DefaultBasicConsumer
{
private readonly IModel _channel;
public MessageReceiver(IModel channel)
{
_channel = channel;
}
public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
{
Console.WriteLine($"------------------------------");
Console.WriteLine($"Consuming Message");
Console.WriteLine(string.Concat("Message received from the exchange ", exchange));
Console.WriteLine(string.Concat("Consumer tag: ", consumerTag));
Console.WriteLine(string.Concat("Delivery tag: ", deliveryTag));
Console.WriteLine(string.Concat("Routing tag: ", routingKey));
//Console.WriteLine(string.Concat("Message: ", Encoding.UTF8.GetString(body)));
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(string.Concat("Message: ", message));
Console.WriteLine($"------------------------------");
_channel.BasicAck(deliveryTag, false);
}
}
I need to have multiple producers which generate messages to the same queue. And multiple customers receive messages from the queue. And messages will be deleted by queue TTL. But now the 1st receiver gets all messages from the queue. How can I do this?
Upvotes: 0
Views: 4342
Reputation: 55
We use “exchange” here, just to show the exchange mechanics in same sample, it’s not really needed for the task (check Worker2 project, it works with another queue, which is binded to the same exchange):
channel.ExchangeDeclare(exchange: “logs”, type: ExchangeType.Fanout);
Full sample of consumption
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Timers;
class Worker
{
public static void Main()
{
// Test of timer handler
System.Timers.TimeraTimer = new System.Timers.Timer();
aTimer.Elapsed += new ElapsedEventHandler((source, e)
=> Console.Write("Timer Test"));
aTimer.Interval=3000;
// Test timer
// aTimer.Enabled = true;
var factory = new ConnectionFactory()
{
HostName = "localhost", UserName="user", Password="password",
// DispatchConsumersAsync = true
};
var connection = factory.CreateConnection();
// Add multiple consumers, so that queue can be processed "in
// parallel"
for (int i=1; i<10; i++)
{
var j=i;
var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "logs", type:
ExchangeType.Fanout);
var queueName=channel.QueueDeclare("test1", durable: true,
autoDelete: false, exclusive: false);
// take 1 message per consumer
channel.BasicQos(0, 1, false);
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
Console.WriteLine($" [*] Waiting for messages in {j}");
var consumer = new EventingBasicConsumer(channel);
consumer. Received+= (model, ea) =>
{
byte[] body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received in {j} -> {message} at
{DateTime.Now}");
// Thread.Sleep(dots * 1000);
// await Task.Delay(3000);
Thread.Sleep(10000);
// async works too
if (j==5)
{
// Test special case of returning item to queue: in
// this case we received the message, but did not process
// it because of some reason.
// QOS is 1, so our consumer is already full. We need
// to return the message to the queue, so that another
// consumer can work with it
Console.WriteLine($"[-] CANT PROCESS {j} consumer!
Error with -> {message}");
channel.BasicNack(deliveryTag: ea.DeliveryTag,
multiple: false, true);
}
else
{
Console.WriteLine($" [x] Done {j} -> {message} at
{DateTime.Now}");
// here channel could also be accessed as
((EventingBasicConsumer)sender).Model
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
}
};
channel.BasicConsume(queue: queueName, autoAck: false,
consumer: consumer);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
full example in link
Upvotes: 0
Reputation: 6462
The best solution is : every client should have its own queue, may be with TTL, may be with expiration parameter.
Upvotes: 1