WoJ
WoJ

Reputation: 29957

How to effectively set a connection timeout for MQTT?

I would like to make sure that my program crashes if it cannot connect to an MQTT server. To this, I set ConnectTimeout to 10 seconds but the call to MQTT hangs when connecting to a server that does not exist (the name does not exist)

package main

import (
    "fmt"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
)

func main() {
    timeout, _ := time.ParseDuration("10s");
    opts := mqtt.NewClientOptions()
    opts.AddBroker("tcp://this.does.not.resolve.example.coooom:1883")
    opts.SetClientID("monitor")
    opts.SetOrderMatters(false)
    opts.SetConnectRetry(true)
    opts.SetConnectTimeout(timeout)
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(fmt.Sprintf("cannot connect to MQTT: %v", token.Error()))
    }
}

How to correctly set the timeout?

Upvotes: 1

Views: 13974

Answers (2)

Brits
Brits

Reputation: 18255

From the docs for SetConnectRetry:

SetConnectRetry sets whether the connect function will automatically retry the connection in the event of a failure (when true the token returned by the Connect function will not complete until the connection is up or it is cancelled) If ConnectRetry is true then subscriptions should be requested in OnConnect handler Setting this to TRUE permits messages to be published before the connection is established.

So when SetConnectRetry(true) Connect() will retry until it succeeds or you stop it. No attempt is made to ascertain if an error is permanent because that is very difficult (for example the failure to resolve this.does.not.resolve.example.coooom may be due to loss of internet connection).

Note: There is a separate option SetAutoReconnect that controls whether a dropped connection (after the initial connection is successfully established) will lead to the client automatically re-establishing the connection.

Its worth noting that the main reason for adding the ConnectRetry option was to allow users to publish messages before the connection comes up (they will be sent automatically when the connection is established). The idea is that you should be able to call Connect and then use the library without worrying about the network status (obviously you are not going to receive messages if the network is down!).

The SetConnectTimeout docs may not be quite as clear as they should be in terms of how this interacts with SetConnectRetry:

SetConnectTimeout limits how long the client will wait when trying to open a connection to an MQTT server before timing out. A duration of 0 never times out. Default 30 seconds. Currently only operational on TCP/TLS connections.

So this controls how long we will wait for a single connection attempt to complete. When SetConnectRetry (true) this interval will be used every time a connection is attempted (starts afresh with each attempt). There is no RetryConnectionsFor type setting.

From self-answer:

... the connections fails correctly (and, hopefully, the client will still reconnect if the connection is dropped)

You cannot change the options once NewClient(o *ClientOptions) has been called (NewClient takes a copy of the options). Its done this way because changing options while operations are in progress would lead to unpredictable outcomes (e.g. if you call opts.SetConnectRetry(true) after calling Connect then the outcome would depend upon whether the Connect call had completed or not which is unpredictable).

I'm not quite sure what you mean by "make sure that my program crashes if it cannot connect to an MQTT server" but the approach I use is something like the following (substitute the appropriate action for the fmt.PrintLn statements) - note that I have not compiled/tested this:

package main

import (
    "fmt"
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "time"
)

func main() {
    timeout, _ := time.ParseDuration("10s")
    opts := mqtt.NewClientOptions()
    opts.AddBroker("tcp://this.does.not.resolve.example.coooom:1883")
    opts.SetClientID("monitor")
    opts.SetOrderMatters(false)
    opts.SetAutoReconnect(true).SetMaxReconnectInterval(10 * time.Second)
    opts.SetConnectRetry(true)
    opts.SetConnectTimeout(timeout)
    client := mqtt.NewClient(opts)

    token := client.Connect()

    go func(token mqtt.Token) {
    for {
        done := token.WaitTimeout(1 * time.Minute)
        if done {
            if token.Error() != nil {
                fmt.Println("Connection permanently failed (most probably due to call to Disconnect)", token.Error())
            } else {
                fmt.Println("Connection established")
            }
            return // We are done!

        }
        fmt.Println("Async MQTT Connection still trying (there could be an issue!)")
        // Can call `client.Disconnect()` to cancel connection if you want to cancel the connection attempts
    }
    }(token)

    // Do some stuff - you can publish messages and the library will send them when the connection comes up
}

Upvotes: 1

WoJ
WoJ

Reputation: 29957

opts.SetConnectRetry(true) seems to put the connection in a loop (without any error messages). By changing the code to ...

package main

import (
    "fmt"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
)

func main() {
    timeout, _ := time.ParseDuration("10s");
    opts := mqtt.NewClientOptions()
    opts.AddBroker("tcp://this.does.not.resolve.example.coooom:1883")
    opts.SetClientID("monitor")
    opts.SetOrderMatters(false)
    opts.SetConnectTimeout(timeout)
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(fmt.Sprintf("cannot connect to MQTT: %v", token.Error()))
    }
    opts.SetConnectRetry(true)
}

... the connections fails correctly (and, hopefully, the client will still reconnect if the connection is dropped)

EDIT: I do not think that moving opts.SetConnectRetry(true) after the connection attempt will make it auto-reconnect (because the options have already been used). This is a catch-22 situation.

Upvotes: 1

Related Questions