Naresh
Naresh

Reputation: 5397

RabbitMQ consumer in Go

I am trying to write a RabbitMQ Consumer in Go. Which is suppose to take the 5 objects at a time from the queue and process them. Moreover, it is suppose to acknowledge if successfully processed else send to the dead-letter queue for 5 times and then discard, it should be running infinitely and handling the cancellation event of the consumer. I have few questions :

  1. Is there any concept of BasicConsumer vs EventingBasicConsumer in RabbitMq-go Reference?
  2. What is Model in RabbitMQ and is it there in RabbitMq-go?
  3. How to send the objects when failed to dead-letter queue and again re-queue them after ttl
  4. What is the significance of consumerTag argument in the ch.Consume function in the below code
  5. Should we use the channel.Get() or channel.Consume() for this scenario?

What are the changes i need to make in the below code to meet above requirement. I am asking this because i couldn't find decent documentation of RabbitMq-Go.

   func main() {

        consumer()        
    }

    func consumer() {

        objConsumerConn := &rabbitMQConn{queueName: "EventCaptureData", conn: nil}      
        initializeConn(&objConsumerConn.conn)


        ch, err := objConsumerConn.conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        msgs, err := ch.Consume(
                objConsumerConn.queueName, // queue
                "demo1",     // consumerTag
                false,   // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
            for d := range msgs {                  
                k := new(EventCaptureData)
                b := bytes.Buffer{}
                b.Write(d.Body)
                dec := gob.NewDecoder(&b)  
                err := dec.Decode(&k)
                d.Ack(true)  

                if err != nil { fmt.Println("failed to fetch the data from consumer", err); }
                    fmt.Println(k)                        
            }
        }()      

        log.Printf(" Waiting for Messages to process. To exit press CTRL+C ")
        <-forever

    }

Edited question:

I have delayed the processing of the messages as suggested in the links link1 link2. But the problem is messages are getting back to their original queue from dead-lettered queue even after ttl. I am using RabbitMQ 3.0.0. Can anyone point out what is the problem?

Upvotes: 4

Views: 7182

Answers (1)

Pedro Werneck
Pedro Werneck

Reputation: 41878

Is there any concept of BasicConsumer vs EventingBasicConsumer in RabbitMq-go Reference?

Not exactly, but the Channel.Get and Channel.Consume calls serve a similar concept. With Channel.Get you have a non-blocking call that gets the first message if there's any available, or returns ok=false. With Channel.Consume the queued messages are delivered to a channel.

What is Model in RabbitMQ and is it there in RabbitMq-go?

If you're referring to the IModel and Connection.CreateModel in C# RabbitMQ, that's something from the C# lib, not from RabbitMQ itself. It was just an attempt to abstract away from the RabbitMQ "Channel" terminology, but it never caught on.

How to send the objects when failed to dead-letter queue and again re-queue them after ttl

Use the delivery.Nack method with requeue=false.

What is the significance of consumerTag argument in the ch.Consume function in the below code

The ConsumerTag is just a consumer identifier. It can be used to cancel the channel with channel.Cancel, and to identify the consumer responsible for the delivery. All messages delivered with the channel.Consume will have the ConsumerTag field set.

Should we use the channel.Get() or channel.Consume() for this scenario?

I think channel.Get() is almost never preferable over channel.Consume(). With channel.Get you'll be polling the queue and wasting CPU doing nothing, which doesn't make sense in Go.

What are the changes i need to make in the below code to meet above requirement.

  1. Since you're batch processing 5 at a time, you can have a goroutine that receives from the consumer channel and once it gets the 5 deliveries you call another function to process them.

  2. To acknowledge or send to the dead-letter queue you'll use the delivery.Ack or delivery.Nack functions. You can use multiple=true and call it once for the batch. Once the message goes to the dead letter queue, you have to check the delivery.Headers["x-death"] header for how many times its been dead-lettered and call delivery.Reject when its been retried for 5 times already.

  3. Use channel.NotifyCancel to handle the cancellation event.

Upvotes: 4

Related Questions