Reputation: 87
As connection opening each time for publishing is costly I'm trying to implement some way to keep the connection alive and share it in my app to publish messages.
var (
Connection *amqp.Connection
Channel *amqp.Channel
err error
)
func Connect() {
Connection, err = amqp.Dial("amqp://guest:guest@localhost:5672")
FailOnError(err, "Failed to connect to RabbitMQ")
Channel, err = Connection.Channel()
FailOnError(err, "Failed to open a channel")
}
func CloseConnection() {
err = Channel.Close()
FailOnError(err, "Failed to close channel ")
err = Connection.Close()
FailOnError(err, "Failed to close connection ")
}
func KeepAlive() {
queue, err := Channel.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
FailOnError(err, "couldn't publish tics")
tic := "tic"
for {
err := Channel.Publish(
"", // exchange
queue.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(tic),
Expiration: "5000",
})
FailOnError(err, "couldn't publish tics")
time.Sleep(5 *time.Second)
}
}
func FailOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
The function KeepAlive
is an infinite loop that keeps sending a dummy message every 5 secs and that message have a TTL of 5 secs too so it gets destroyed.
func main() {
rabbitmq.Connect()
defer rabbitmq.CloseConnection()
go func() {
//publisher connection to stay alive as long as application is running
rabbitmq.KeepAlive()
}()
data_layer.OpenDBConnection()
router := gin.Default()
router.POST("/whatever", whatever)
err := router.Run()
if err != nil {
log.Fatal(err.Error())
}
}
Here I'm creating the connection and calling KeepAlive
in a goroutine so it can work in the background, keeping my connection alive all the time.
My questions:
I feel that this way is just a work around and although I've tried to search for examples how to keep it alive, it seems all of these examples are interested in the consumer side. Is there a cleaner way to keep my connection alive?
Is keeping my connection alive as long as my application is running bad? performance wise (network, memory usage)? note: I'm planning to monitor this with Prometheus to watch the performance but any note about what I might face would be helpful
Side note: these tics that are sent will be sent to a dummy queue since if I send it to my queue that I consume messages from by another service it will get stuck behind actual messages that doesn't have TTL and these tics will grow very large.
Upvotes: 2
Views: 5590
Reputation: 44807
With streadway/amqp
you don't need to implement the keepalive yourself. The library already provides this mechanism.
The method amqp.Dial
constructs a Connection
with a default heartbeat of 10 seconds. You can see the code here:
// connection.go
func Dial(url string) (*Connection, error) {
return DialConfig(url, Config{
Heartbeat: defaultHeartbeat,
Locale: defaultLocale,
})
}
This works by sending heartbeat frames on the open connection, which is going to be more efficient and maintainable than sending fake messages to a queue created only for that reason.
From the above follows that you can change the connection heartbeat with amqp.DialConfig
:
conn, err := amqp.DialConfig(url, amqp.Config{
Heartbeat: 5 * time.Second,
})
What you might want to implement yourself is the reconnect-on-error logic. For that you can find some useful information here: How to check if the channel is still working in streadway/amqp RabbitMQ client?
Upvotes: 3