Reputation: 1984
RabbitMQ 3.5 now supports message priority; However, I am unable to build a working example. I've placed my code below. It includes the output that I expect and the output I actually. I'd be interested in more documentation, and/or a working example.
So my question in short: How do I get message priority to work in Rabbit 3.5.0.0?
Publisher:
using System;
using RabbitMQ.Client;
using System.Text;
using System.Collections.Generic;
class Publisher
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
IDictionary <String , Object> args = new Dictionary<String,Object>() ;
args.Add(" x-max-priority ", 10);
channel.QueueDeclare("task_queue1", true, false, true, args);
for (int i = 1 ; i<=10; i++ )
{
var message = "Message";
var body = Encoding.UTF8.GetBytes(message + " " + i);
var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
properties.Priority = Convert.ToByte(i);
channel.BasicPublish("", "task_queue1", properties, body);
}
}
}
}
}
Consumer:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
using System.Collections.Generic;
namespace Consumer
{
class Worker
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
IDictionary<String, Object> args = new Dictionary<String, Object>();
channel.BasicQos(0, 1, false);
var consumer = new QueueingBasicConsumer(channel);
IDictionary<string, object> consumerArgs = new Dictionary<string, object>();
channel.BasicConsume( "task_queue1", false, "", args, consumer);
Console.WriteLine(" [*] Waiting for messages. " +
"To exit press CTRL+C");
while (true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
channel.BasicAck(ea.DeliveryTag, false);
}
}
}
}
}
}
Actual output:
[*] Waiting for messages. To exit press CTRL+C
[x] Received Message 1
[x] Received Message 2
[x] Received Message 3
[x] Received Message 4
[x] Received Message 5
[x] Received Message 6
[x] Received Message 7
[x] Received Message 8
[x] Received Message 9
[x] Received Message 10
Expected output:
[*] Waiting for messages. To exit press CTRL+C
[x] Received Message 10
[x] Received Message 9
[x] Received Message 8
[x] Received Message 7
[x] Received Message 6
[x] Received Message 5
[x] Received Message 4
[x] Received Message 3
[x] Received Message 2
[x] Received Message 1
UPDATE #1. I found an example in Java here. However it's the Rabbit 3.4.x.x. addin that was incorporated into 3.5. The only difference I can see is that they express the priority as an int and mine is a byte. But I feel like that's a red herring. I'm at a bit of a loss here.
Upvotes: 10
Views: 10455
Reputation: 27852
Another possibility (for future searchers)
The "Push" method of message delivery doesn't seem to respect Priority.
http://rabbitmq.docs.pivotal.io/35/rabbit-web-docs/dotnet-api-guide.html.html
The below is a quote from the URL above. I've bolded the important part.
Retrieving Messages By Subscription ("push API")
Another way to receive messages is to set up a subscription using the IBasicConsumer interface. The messages will then be delivered automatically as they arrive, rather than having to be requested proactively. One way to implement a consumer is to use the convenience class EventingBasicConsumer, which dispatches deliveries and other consumer lifecycle events as C# events:
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body;
// ... process the message
ch.BasicAck(ea.DeliveryTag, false);
};
String consumerTag = channel.BasicConsume(queueName, false, consumer);
By changing to the "pull" method, Priority seems to be respected. However, in the quote below (from the same url above), it looks like there is a trade-off (that I've bolded)
Fetching Individual Messages ("pull API") To retrieve individual messages, use IModel.BasicGet. The returned value is an instance of BasicGetResult, from which the header information (properties) and message body can be extracted:
bool noAck = false;
BasicGetResult result = channel.BasicGet(queueName, noAck);
if (result == null) {
// No message available at this time.
} else {
IBasicProperties props = result.BasicProperties;
byte[] body = result.Body;
...
Since noAck = false above, you must also call IModel.BasicAck to acknowledge that you have successfully received and processed the message:
...
// acknowledge receipt of the message
channel.BasicAck(result.DeliveryTag, false);
}
Note that fetching messages using this API is relatively inefficient. If you'd prefer RabbitMQ to push messages to the client, see the next section.
(The "next" section in this case takes you to the "push" method at the top of this post)
Upvotes: 0
Reputation: 305
A similar RabbitMq Priority Queue Implementation in Node JS
Install amqplib
In order to test, we are required to have amqplib installed
npm install amqplib
Publisher (send.js)
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}
function on_connect(err, conn) {
if (err !== null) return bail(err);
// name of queue
var q = 'hello';
var msg = 'Hello World!';
var priorityValue = 0;
function on_channel_open(err, ch) {
if (err !== null) return bail(err, conn);
// maxPriority : max priority value supported by queue
ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) {
if (err !== null) return bail(err, conn);
for(var index=1; index<=100; index++) {
priorityValue = Math.floor((Math.random() * 10));
msg = 'Hello World!' + ' ' + index + ' ' + priorityValue;
ch.publish('', q, new Buffer(msg), {priority: priorityValue});
console.log(" [x] Sent '%s'", msg);
}
ch.close(function() { conn.close(); });
});
}
conn.createChannel(on_channel_open);
}
amqp.connect(on_connect);
Subscriber (receive.js)
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}
function on_connect(err, conn) {
if (err !== null) return bail(err);
process.once('SIGINT', function() { conn.close(); });
var q = 'hello';
function on_channel_open(err, ch) {
ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) {
if (err !== null) return bail(err, conn);
ch.consume(q, function(msg) { // message callback
console.log(" [x] Received '%s'", msg.content.toString());
}, {noAck: true}, function(_consumeOk) { // consume callback
console.log(' [*] Waiting for messages. To exit press CTRL+C');
});
});
}
conn.createChannel(on_channel_open);
}
amqp.connect(on_connect);
Run:
node send.js
It will create a queue named 'hello' and will flood it with '1000' sample messages using default AMQP exchange.
node receive.js
It will act as a consumer to subscribe to messages waiting in the queue.
Upvotes: 2
Reputation: 1984
Well I solved it. It was a dumb mistake. I wrote:
args.Add(" x-max-priority ", 10);
It should have been
args.Add("x-max-priority", 10);
I'll leave this up so other people can have a working example of Rabbitmq 3.5's Priority Queues in C#.
Upvotes: 15