Moayad Al-sowayegh
Moayad Al-sowayegh

Reputation: 87

How to keep my connection alive for publishing messages with RabbitMQ streadway/amqp?

As connection opening each time for publishing is costly I'm trying to implement some way to keep the connection alive and share it in my app to publish messages.

var (
    Connection *amqp.Connection
    Channel *amqp.Channel
    err error

func Connect() {

    Connection, err = amqp.Dial("amqp://guest:guest@localhost:5672")
    FailOnError(err, "Failed to connect to RabbitMQ")

    Channel, err = Connection.Channel()
    FailOnError(err, "Failed to open a channel")

func CloseConnection() {
    err = Channel.Close()
    FailOnError(err, "Failed to close channel ")
    err = Connection.Close()
    FailOnError(err, "Failed to close connection ")

func KeepAlive() {

    queue, err := Channel.QueueDeclare(
        "hello", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    FailOnError(err, "couldn't publish tics")

    tic := "tic"
    for {
        err := Channel.Publish(
            "",         // exchange
            queue.Name, // routing key
            false,      // mandatory
            false,      // immediate
            amqp.Publishing {
                ContentType: "text/plain",
                Body:        []byte(tic),
                Expiration: "5000",
        FailOnError(err, "couldn't publish tics")
        time.Sleep(5 *time.Second)

func FailOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)

The function KeepAlive is an infinite loop that keeps sending a dummy message every 5 secs and that message have a TTL of 5 secs too so it gets destroyed.

func main() {
    defer rabbitmq.CloseConnection()
    go func() {
        //publisher connection to stay alive as long as application is running

    router := gin.Default()

    router.POST("/whatever", whatever)

    err := router.Run()
    if err != nil {

Here I'm creating the connection and calling KeepAlive in a goroutine so it can work in the background, keeping my connection alive all the time.

My questions:

Side note: these tics that are sent will be sent to a dummy queue since if I send it to my queue that I consume messages from by another service it will get stuck behind actual messages that doesn't have TTL and these tics will grow very large.

Upvotes: 2

Views: 5590

Answers (1)


Reputation: 44807

With streadway/amqp you don't need to implement the keepalive yourself. The library already provides this mechanism.

The method amqp.Dial constructs a Connection with a default heartbeat of 10 seconds. You can see the code here:

// connection.go
func Dial(url string) (*Connection, error) {
    return DialConfig(url, Config{
        Heartbeat: defaultHeartbeat,
        Locale:    defaultLocale,

This works by sending heartbeat frames on the open connection, which is going to be more efficient and maintainable than sending fake messages to a queue created only for that reason.

From the above follows that you can change the connection heartbeat with amqp.DialConfig:

    conn, err := amqp.DialConfig(url, amqp.Config{
        Heartbeat: 5 * time.Second,

What you might want to implement yourself is the reconnect-on-error logic. For that you can find some useful information here: How to check if the channel is still working in streadway/amqp RabbitMQ client?

Upvotes: 3

Related Questions