Reputation: 3
I have one queue-publisher and one hundred queue-subscribers. Subscribers bind to one queue. The Rabbit`s guide said that to each queue Rabbit creates single thread. I want to send all messages to subscribers through one queue and save all unsended messages in the same queue if destination subscriber is offline. I have two solutions:
suscriber`s code
static void Main(string[] args)
{
List<string> serevities1 = new List<string>() { "qwerty.red" };
List<string> serevities2 = new List<string>() { "asdfgh.green" };
string exchange = "topic_logs";
//string direction = ExchangeType.Topic;
string direction = ExchangeType.Direct;
var consumer1 = new MqDll.MqConsumer(exchange, direction, serevities1);
MqDll.MqConsumer consumer2;
Task.Run(() => {
Thread.Sleep(10000);
consumer2 = new MqDll.MqConsumer(exchange, direction, serevities2);
});
Console.ReadLine();
}
public class MqConsumer
{
private IConnection connection;
private IModel channel;
//ExchangeType.Fanout
public MqConsumer(string exchange, string direction, List<string> severities = null)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
channel.ExchangeDeclare(exchange, direction);
string queueName = "task_queue";
// queueName= channel.QueueDeclare().QueueName;
channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
Bind(queueName, exchange, severities);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += MsgReceived;
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine("Consumer created");
}
~MqConsumer()
{
channel.Close();
connection.Close();
}
private void Bind(string queuename, string exchange, List<string> severities)
{
if (severities != null)
{
severities.ForEach(x =>
{
channel.QueueBind(queue: queuename,
exchange: exchange,
routingKey: x);
});
}
else
{
channel.QueueBind(queue: queuename,
exchange: exchange,
routingKey: "");
}
}
private void MsgReceived(object model, BasicDeliverEventArgs ea)
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var rkey = ea.RoutingKey;
Console.WriteLine($" [x] Received {message} {rkey}");
Console.WriteLine($" [x] Done {DateTime.Now} | {this.GetHashCode()}");
}
}
publisher`s code
static void Main(string[] args)
{
List<string> serevities = new List<string>() { "qwerty.red", "asdfgh.green" };
string exchange = "topic_logs";
//string direction = ExchangeType.Topic;
string direction = ExchangeType.Direct;
var publisher = new MqDll.MqPublisher(exchange, direction);
Console.WriteLine("Publisher created");
var msg = Console.ReadLine();
while (msg != "q")
{
serevities.ForEach(x =>
{
publisher.Publish("SomeMsg..", "topic_logs", x);
});
msg = Console.ReadLine();
}
}
public class MqPublisher
{
private IConnection connection;
private IModel channel;
public MqPublisher(string exchange, string type)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.ExchangeDeclare(exchange, type);
}
~MqPublisher()
{
channel.Close();
connection.Close();
}
public void Publish(string msg, string exchange = "logs", string severity = "", bool isPersistent = true)
{
var properties = channel.CreateBasicProperties();
properties.Persistent = isPersistent;
channel.BasicPublish(exchange: exchange,
routingKey: severity,
basicProperties: properties,
body: Encoding.UTF8.GetBytes(msg));
Console.WriteLine(" [x] Sent {0}", msg);
}
}
Is there a way to combine these two solutions and make one queue to all subscribers, bind subscribers to unique routing key and save messages if direct subscriber (bound to direct routing key) is offline?
Upvotes: 0
Views: 234
Reputation: 94
Have you considered maybe a DB for users that are not online..
Queue => consumer is offline => send to DB Consumer is online => Publisher checks db for any messages that might have been missed => then directs them to Consumber
Upvotes: 1