Reputation: 1104
Here's a simple .net core 1.1 console app. Call it with a -r parameter and it reads all the messages in the rabbitmq Queue, call it with any other parameters, and each parameter is enqueued as a message.
Here's the problem, I can enqueue the messages fine, but all attempts to read the messages result in no messages being read. Clearly I'm not consuming the queue correctly, and would appreciate some guidance.
Thanks!
using System;
using System.Collections.Generic;
using System.Text;
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace RabbitMqDemo
{
class Program
{
static void Main(string[] args)
{
var client = new MessagingClient();
if (args.Length == 1 && args[0].ToLower() == "-r")
{
Console.WriteLine("Reading Messages from Queue.");
var messages = client.ReceiveMessages();
Console.WriteLine($"Read {messages.Length} message(s) from queue.");
foreach(var msg in messages)
Console.WriteLine(msg);
}
else
{
foreach (var msg in args)
{
client.SendMessage(msg);
}
Console.WriteLine($"Enqueued {args.Length} Message.");
}
}
}
internal class MessagingClient
{
private readonly ConnectionFactory connectionFactory;
private string ExchangeName => "defaultExchange";
private string RoutingKey => "";
private string QueueName => "Demo";
private string HostName => "localhost";
public MessagingClient()
{
this.connectionFactory = new ConnectionFactory {HostName = this.HostName};
}
public void SendMessage(string message)
{
using (var connection = this.connectionFactory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
this.QueueDeclare(channel, this.QueueName);
var properties = this.SetMessageProperties(channel, message);
string messageJson = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(messageJson);
channel.BasicPublish(exchange: this.ExchangeName, routingKey: this.RoutingKey, basicProperties: properties, body: body);
}
}
}
public string[] ReceiveMessages()
{
var messages = new List<string>();
using (var connection = this.connectionFactory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
this.QueueDeclare(channel, this.QueueName);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
string bodystring = Encoding.UTF8.GetString(ea.Body);
messages.Add(bodystring);
// ReSharper disable once AccessToDisposedClosure
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: this.QueueName, autoAck: false, consumer: consumer);
}
}
return messages.ToArray();
}
private void QueueDeclare(IModel channel, string queueName)
{
channel.ExchangeDeclare(ExchangeName, type: ExchangeType.Direct,
durable: true,
autoDelete: false,
arguments: null);
var queueDeclared = channel.QueueDeclare(queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queueName, ExchangeName, RoutingKey);
}
private IBasicProperties SetMessageProperties(IModel channel, object message)
{
var properties = channel.CreateBasicProperties();
properties.ContentType = "application/json";
properties.Persistent = true;
return properties;
}
}
}
Upvotes: 1
Views: 546
Reputation: 9637
ReceiveMessages()
and thus your reader probably returns immediately with an empty array before the event has a chance to fire. You have no code to wait while the consumer receives messages from RabbitMQ. Notice in the tutorial how Console.ReadLine()
is used. In your example, you can use a synchronization object (ManualResetEvent
) to prevent ReceiveMessages()
from returning until a certain message count is read.Upvotes: 1