Reputation: 11853
Here is what I have so far. I've been able to successfully publish a message to a queue and see that it is there via RabbitMQ's management console.
However, when I try to receive it, it does not seem to trigger the callback function at all.
Here is the relevant code.
MessageQueue mq = new MessageQueue();
mq.Receive("task_queue", (model, ea) => {
var message = Encoding.UTF8.GetString(ea.Body);
System.Diagnostics.Debug.WriteLine(message);
});
Here is my Receive
function in the MessageQueue
class:
public void Receive(string queueName, EventHandler<BasicDeliverEventArgs> onReceived)
{
using (IConnection connection = GetConnection(myLocalhost))
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
// Don't dispatch a new message to a consumer until it has processed and acknowledged the previous one.
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel); // non-blocking
// Set the event to be executed when receiving a message
consumer.Received += onReceived;
// Register a consumer to listen to a specific queue.
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
}
}
}
When I try to run the Receive
function while there is something in the queue, nothing is printed to my output window.
Can anyone help me on this?
UPDATE
I took the code in the Receive
function and placed it in the same file as the code that calls it. Still no luck. That rules out a scoping issue I think. I also tried setting the Received
field to an actual event delegate (instead of a onReceive
function and had that call another function in which I put a breakpoint. That function is never hit leading me to believe that my event delegate callback is never being called at all.
I'm at a loss as to why this is. The message is still being consumed from the queue as the RabbitMQ management console shows me. I've also tried renaming the queue to something else to make sure no other phantom services are consuming from the same queue. No cigar.
UPDATE 2
I tried extracting the two using
statements and calling my Receive
function inside there in order to keep the scope but that didn't work either. I even extracted the code in the whole Receive
block out to a main function and now it doesn't even consume from the queue.
Upvotes: 1
Views: 2165
Reputation: 16157
Looking at your code above, you have a pretty straightforward problem.
The instant after you call channel.BasicConsume
, the whole thing (connection/channel) goes out of scope and is immediately disposed/destroyed via the using
statement.
To prevent this from happening, you need to have an infinite loop immediately following the channel.BasicConsume
, with appropriate logic of course to exit when you shut down the program.
while (_isRunning & channel.IsOpen) {
Thread.Sleep(1);
// Other application logic here; e.g. periodically break out of the
// loop to prevent unacknowledged messages from accumulating in the system
// (if you don't, random effects will guarantee that they eventually build up)
}
Upvotes: 3