Phoste
Phoste

Reputation: 1209

How to reconnect to MQTT with Paho drivers

I'm using the MQTT client Java library for some of my applications and I think I'm reconnecting to the broker the bad way. For now I have a class who handles the MQTT actions like connect, disconnect, publish and subscribe.

public class MqttConnection {

    private static final String BROKER_ADDRESS = Preferences.getProperty("mqtt-address");
    private static final String BROKER_PORT = Preferences.getProperty("mqtt-port");
    private static final String BROKER_URI = "tcp://" + BROKER_ADDRESS + ":" + BROKER_PORT;

    private static final String VHOST = Preferences.getProperty("mqtt-vhost");
    private static final String USERNAME = Preferences.getProperty("mqtt-username");
    private static final String PASSWORD = Preferences.getProperty("mqtt-password");

    private static MqttClient client;

    private static final Logger logger = LogManager.getLogger(MqttConnection.class);

    static {
        try {
            client = new MqttClient(BROKER_URI, MqttClient.generateClientId());
        } catch (MqttException ex) {
            logger.fatal(ex);
        }
        client.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable thrwbl) {
                logger.info("MQTT : Perte de connexion...");
                MqttConnection.start();
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                // CODE HERE
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken imdt) { }
        });
    }

    public static void start() {
        connect();
    }

    private static void connect() {
        if (!client.isConnected()) {
            try {
                if (Preferences.getProperty("mqtt-isauth").equalsIgnoreCase("true")) {
                    MqttConnectOptions options = new MqttConnectOptions();
                    String username = (VHOST.equals("")) ? USERNAME : VHOST + ":" + USERNAME;
                    options.setUserName(username);
                    options.setPassword(PASSWORD.toCharArray());
                    client.connect(options);
                } else {
                    client.connect();
                }
                logger.info("MQTT : Connecté au broker.");
            } catch (MqttException ex) {
                logger.fatal(ex);
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    logger.fatal(e);
                }
                connect();
            }
        }
    }

    private static void subscribe() {
        if (client.isConnected()) {
            try {
                client.subscribe("+/SWI1");
            } catch (MqttException e) {
                logger.fatal(e);
            }
        }
    }
}

But it seems to open many connections when it tries to reconnect, and makes the broker bug. What is the best way to reconnect to a MQTT broker ?

Upvotes: 2

Views: 1390

Answers (2)

trickyedecay
trickyedecay

Reputation: 31

I tried to use MqttConnectOptions.setAutomaticReconnect(true) but it cannot reconnect successfully when connection failure.

so i try to manage reconnect myself using some code like below and it work!

// make the client nullable and set it to null before to reconnect
private var mqttClient: MqttAndroidClient? = null
// setup connection options and connect
private fun setupMQTT(){
    Log.d("mqtt-log", "setup mqtt")
    val serverIP = "192.168.0.198"
    val serverPort = "1883"
    val serverURI = "tcp://${serverIP}:${serverPort}"
    mqttClient = MqttAndroidClient(this, serverURI, "kotlin_client")
    val options = MqttConnectOptions()
    // don't use build-in automaticRecconnect
    options.isAutomaticReconnect = false
    options.connectionTimeout = 0
    options.isCleanSession = false
    try {
        mqttClient!!.connect(options, null, iMqttActionListener)
    } catch (e: MqttException) {
        e.printStackTrace()
        Log.d("mqtt-log", "Exception")
    }
}
// handle connection action
val iMqttActionListener = object : IMqttActionListener {
    override fun onSuccess(asyncActionToken: IMqttToken?) {
        Log.d("mqtt-log", "Connection success")
        subscribeMQTT() // some methods for subscribing topics
    }

    override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
        Log.d("mqtt-log", "Connection failure")

        // retry after 3s.
        // its the very important part to retry.
        Handler(Looper.getMainLooper()).postDelayed({
            with(mqttClient!!) {
                unregisterResources()
                close()
                disconnect()
                setCallback(null)
            }
            mqttClient = null
            setupMQTT() 
        },3000)
    }
}
// when connection is complete try to subscribe topics
private fun subscribeMQTT(){
    mqttClient?.setCallback(object : MqttCallback {
        override fun messageArrived(topic: String?, message: MqttMessage?) {
            Log.d("mqtt-log", "Receive message: ${message.toString()} from topic: $topic")
        }

        override fun connectionLost(cause: Throwable?) {
            Log.d("mqtt-log", "Connection lost ${cause.toString()}")
            Handler(Looper.getMainLooper()).postDelayed({
                with(mqttClient!!) {
                    unregisterResources()
                    close()
                    disconnect()
                    setCallback(null)
                }
                mqttClient = null
                setupMQTT()
            },3000)
        }

        override fun deliveryComplete(token: IMqttDeliveryToken?) {

        }
    })
    mqttClient?.subscribe("topic", 1, null, object : IMqttActionListener {
        override fun onSuccess(asyncActionToken: IMqttToken?) {
            Log.d("mqtt-log", "Subscribed to topic")
        }

        override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
            Log.d("mqtt-log", "Failed to subscribe topic")
        }
    })
}

and the build-in setAutomaticReconnect(true) will only reconnect when the connection is already setup. manage reconnection by myself can trigger reconnect before the connection is setup.

Upvotes: 0

Amit Jain
Amit Jain

Reputation: 36

You can use MqttConnectOptions.setAutomaticReconnect(true) to enable automatic reconnect.

Upvotes: 0

Related Questions