Reputation: 5397
I am trying to write a RabbitMQ Consumer in Go. Which is suppose to take the 5 objects at a time from the queue and process them. Moreover, it is suppose to acknowledge if successfully processed else send to the dead-letter queue for 5 times and then discard, it should be running infinitely and handling the cancellation event of the consumer. I have few questions :
BasicConsumer
vs EventingBasicConsumer
in RabbitMq-go Reference?Model
in RabbitMQ and is it there in RabbitMq-go?ttl
consumerTag
argument in the ch.Consume
function in the below codechannel.Get()
or channel.Consume()
for this scenario?What are the changes i need to make in the below code to meet above requirement. I am asking this because i couldn't find decent documentation of RabbitMq-Go.
func main() {
consumer()
}
func consumer() {
objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}
initializeConn(&objConsumerConn.conn)
ch, err := objConsumerConn.conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
msgs, err := ch.Consume(
objConsumerConn.queueName, // queue
"demo1", // consumerTag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
k := new(EventCaptureData)
b := bytes.Buffer{}
b.Write(d.Body)
dec := gob.NewDecoder(&b)
err := dec.Decode(&k)
d.Ack(true)
if err != nil { fmt.Println("failed to fetch the data from consumer", err); }
fmt.Println(k)
}
}()
log.Printf(" Waiting for Messages to process. To exit press CTRL+C ")
<-forever
}
Edited question:
I have delayed the processing of the messages as suggested in the links link1 link2. But the problem is messages are getting back to their original queue from dead-lettered queue even after ttl. I am using RabbitMQ 3.0.0
. Can anyone point out what is the problem?
Upvotes: 4
Views: 7182
Reputation: 41878
Is there any concept of BasicConsumer vs EventingBasicConsumer in RabbitMq-go Reference?
Not exactly, but the Channel.Get
and Channel.Consume
calls serve a similar concept. With Channel.Get
you have a non-blocking call that gets the first message if there's any available, or returns ok=false
. With Channel.Consume
the queued messages are delivered to a channel.
What is Model in RabbitMQ and is it there in RabbitMq-go?
If you're referring to the IModel
and Connection.CreateModel
in C# RabbitMQ, that's something from the C# lib, not from RabbitMQ itself. It was just an attempt to abstract away from the RabbitMQ "Channel" terminology, but it never caught on.
How to send the objects when failed to dead-letter queue and again re-queue them after ttl
Use the delivery.Nack method with requeue=false
.
What is the significance of consumerTag argument in the ch.Consume function in the below code
The ConsumerTag
is just a consumer identifier. It can be used to cancel the channel with channel.Cancel, and to identify the consumer responsible for the delivery. All messages delivered with the channel.Consume
will have the ConsumerTag
field set.
Should we use the
channel.Get()
orchannel.Consume()
for this scenario?
I think channel.Get()
is almost never preferable over channel.Consume()
. With channel.Get
you'll be polling the queue and wasting CPU doing nothing, which doesn't make sense in Go.
What are the changes i need to make in the below code to meet above requirement.
Since you're batch processing 5 at a time, you can have a goroutine that receives from the consumer channel and once it gets the 5 deliveries you call another function to process them.
To acknowledge or send to the dead-letter queue you'll use the delivery.Ack or delivery.Nack functions. You can use multiple=true
and call it once for the batch. Once the message goes to the dead letter queue, you have to check the delivery.Headers["x-death"]
header for how many times its been dead-lettered and call delivery.Reject when its been retried for 5 times already.
Use channel.NotifyCancel to handle the cancellation event.
Upvotes: 4