Reputation: 3581
Since RabbitMQ version 3.5.0 priority queues are supported - https://www.rabbitmq.com/priority.html
The queue can be declared if x-max-priority argument is passed during the queue creation.
I can successfully declare a queue with priority support
brkrSub := broker.NewSubscribeOptions(
broker.DisableAutoAck(),
rabbitmq.QueueArguments(map[string]interface{}{"x-max-priority": 10}),
)
service.Server().Subscribe(
service.Server().NewSubscriber(
"mytopic",
h.Handle,
server.SubscriberQueue("mytopic.hello"),
server.SubscriberContext(brkrSub.Context),
),
)
But how do I publish a message specifying a priority?
body := &message.MyTestMessage{
Message: fmt.Sprintf("Message number %d", counter),
}
msg := client.NewMessage(
topic,
body,
// TODO: Priority
)
if err := client.Publish(ctx, msg); err != nil {
fmt.Printf("Cannot publish message: ", err.Error())
return
}
I could not find a direct way of passing Priority as either MessageOption or PublishOption, however, it seems there is a way to specify additional options in client.Publish context. Am I looking into the correct direction, and if so can you help me a little here?
Edit: I was able to do the following without causing any compile time errors. Priority is still ignored though and messages coming in the usual fasion
func setPriority(ctx context.Context, priority int) client.PublishOption {
return func(o *client.PublishOptions) {
o.Context = context.WithValue(ctx, "priority", priority)
}
}
func publish(ctx context.Context, priority int, counter int) {
//body := fmt.Sprintf("hello, I am a message %d", counter)
body := &message.MyTestMessage{
Message: fmt.Sprintf("Message number %d", counter),
}
msg := client.NewMessage(
topic,
body,
)
if err := client.Publish(ctx, msg, setPriority(ctx, priority)); err != nil {
fmt.Printf("Cannot publish message: ", err.Error())
return
}
fmt.Printf("Published message %d to %s \n", counter, topic)
}
Upvotes: 1
Views: 517
Reputation: 11
var brokerOpts broker.PublishOptions
rabbitmq.Priority(uint8(10))(&brokerOpts)
event.Publish(ctx, payload, client.PublishContext(brokerOpts.Context))
Upvotes: 0
Reputation: 1118
Try something like that:
func publishMessageToChan(queue *amqp.Queue, channel *amqp.Channel, messageToQueue string) error {
return channel.Publish(
"<exchange>", // exchange
"<queue>", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Timestamp: time.Now(),
ContentType: "text/plain",
Body: []byte(messageToQueue),
Priority: 0, // <-- Priority here < 0 to 9>
})
}
With library "github.com/streadway/amqp"
Upvotes: 0