Hansraj Patil
Hansraj Patil

Reputation: 3

MQTT JAVA client program - 2nd client showing as "disconnected ungracefully" when I terminate the server

package com.servicegateway.mqtt.client;

import java.util.UUID;

import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck;

public class MQTTTestClient {

    private static Mqtt3AsyncClient _client = null;
    public static void main(String[] args) {
        System.out.println("Starting");
        for(int i=0; i<2; i++) {
            multiClient();
        }

    }

    private static void multiClient() {
        UUID uuid = UUID.randomUUID();
        String clientID = "jdeon-" + uuid.toString();
        System.out.println("\tClient ID: " + clientID);

        // Instantiate the client object
        _client = MqttClient.builder()
                .useMqttVersion3()
                .identifier(clientID)
                .serverHost("localhost")
                .serverPort(1883)
    // Test server does not have SSL enabled
    //.sslWithDefaultConfig()       
                .buildAsync();

        // Connect to the MQTT Server
        System.out.println("Attempting to connect...");
        _client.connectWith()
            .simpleAuth()
                .username("my-user")                      // Not used by test server
                .password("my-password".getBytes())       // Not used by test server
                .applySimpleAuth()
            .send()
            .whenComplete((connAck, throwable) -> {
                if (throwable != null) {
                    System.out.println("Error occurred connecting to server: " + throwable.getMessage());
                    throwable.printStackTrace();
                } else {
                    handleConnectSuccess(connAck,clientID);
                    System.out.println("Done");
                }
            });

        System.out.println("Main is done, but client connection code is still running due to asynchronous call.");
    }

    // Handle a successful connection to the server
    private static void handleConnectSuccess(Mqtt3ConnAck connAck,String clientID) {
        System.out.println("\tSuccessfully connected to server.");

        // Display information returned in connAck variable
        System.out.println("\tconnAck -> " + connAck);

        // Subscribe to a topic            
       String topicName = "jdeon/testTopic "+clientID;          
        subscribeToTopic(topicName);

        // Publish a message to the topic
        String message = "Hello World! "+clientID;
        publishToTopic(topicName, message);

        // Disconnect from the MQTT server
        if (_client != null) {
            try {
                System.out.println("\tWait 300ms before disconnecting from server");

                Thread.sleep(30000L);
                System.out.println("moving to _client.disconnect() method at line :78 "+_client);
                _client.disconnect();

                System.out.println("\tDisconnected");
            } catch (InterruptedException e) {
                System.out.println("Error sleeping for 3000ms");
                e.printStackTrace();
            }
        }
    }

    // Subscribe to specified topic
    private static void subscribeToTopic(String topicName) {
        System.out.println("\tSubscribing to topic: " + topicName);

        _client.subscribeWith()
            .topicFilter(topicName)
            .callback(publish -> {
                // Process the received message
                System.out.println("\tReceived message: " + publish);
                String message = new String(publish.getPayloadAsBytes());
                System.out.println("\t\tMessage content received: " + message);
            })
            .send()
            .whenComplete((subAck, throwable) -> {
                if (throwable != null) {
                    System.out.println("Error occurred subscribing to topic (" + topicName + "): " + throwable.getMessage());
                    throwable.printStackTrace();

                } else {
                    System.out.println("\tSuccessfully subscribed to topic: " + topicName);
                }
            });

    }

    // Publish specified message to specified topic
    private static void publishToTopic(String topicName, String message) {
        System.out.println("\tPublishing message to topic: " + topicName+",   message: "+message);

        _client.publishWith()
            .topic(topicName)
            .payload(message.getBytes())
            .send()
            .whenComplete((publish, throwable) -> {
                if (throwable != null) {
                    System.out.println("Error occurred publishing message to topic (" + topicName + "): " + throwable.getMessage());
                    throwable.printStackTrace();
                } else {
                    System.out.println("\tSuccessfully published message to topic: " + topicName);
                    System.out.println("\t\tMessage content sent: " + message);
                }
            });
    }

}

I am running above program in Eclipse and 1st client is getting disconnected gracefully and also getting logged in MQTT server (event.log) but second client is not getting disconnected. When I click "terminate" button in Eclipse, 2nd client is getting logged as "disconnected ungracefully" why it not getting disconnected until I terminate it? i attached program and console results are shown below :

Starting
    Client ID: jdeon-39b85b5f-1f53-4f4e-b1c7-80f0527b953a
Attempting to connect...
Main is done, but client connection code is still running due to asynchronous call.
    Client ID: jdeon-8646a5c0-5b9a-46a6-815f-0050f96e7f67
Attempting to connect...
Main is done, but client connection code is still running due to asynchronous call.
    Successfully connected to server.
    connAck -> MqttConnAck{returnCode=SUCCESS, sessionPresent=false}
    Subscribing to topic: jdeon/testTopic jdeon-39b85b5f-1f53-4f4e-b1c7-80f0527b953a
    Successfully connected to server.
    connAck -> MqttConnAck{returnCode=SUCCESS, sessionPresent=false}
    Subscribing to topic: jdeon/testTopic jdeon-8646a5c0-5b9a-46a6-815f-0050f96e7f67
    Publishing message to topic: jdeon/testTopic jdeon-8646a5c0-5b9a-46a6-815f-0050f96e7f67,   message: Hello World! jdeon-8646a5c0-5b9a-46a6-815f-0050f96e7f67
    Publishing message to topic: jdeon/testTopic jdeon-39b85b5f-1f53-4f4e-b1c7-80f0527b953a,   message: Hello World! jdeon-39b85b5f-1f53-4f4e-b1c7-80f0527b953a
    Wait 300ms before disconnecting from server
    Wait 300ms before disconnecting from server
    Successfully subscribed to topic: jdeon/testTopic jdeon-39b85b5f-1f53-4f4e-b1c7-80f0527b953a
    Successfully subscribed to topic: jdeon/testTopic jdeon-8646a5c0-5b9a-46a6-815f-0050f96e7f67
    Received message: MqttPublish{topic=jdeon/testTopic jdeon-8646a5c0-5b9a-46a6-815f-0050f96e7f67, payload=55byte, qos=AT_MOST_ONCE, retain=false}
        Message content received: Hello World! jdeon-8646a5c0-5b9a-46a6-815f-0050f96e7f67
    Received message: MqttPublish{topic=jdeon/testTopic jdeon-39b85b5f-1f53-4f4e-b1c7-80f0527b953a, payload=55byte, qos=AT_MOST_ONCE, retain=false}
        Message content received: Hello World! jdeon-39b85b5f-1f53-4f4e-b1c7-80f0527b953a
moving to _client.disconnect() method at line :78 com.hivemq.client.internal.mqtt.mqtt3.Mqtt3AsyncClientView@a294d46
moving to _client.disconnect() method at line :78 com.hivemq.client.internal.mqtt.mqtt3.Mqtt3AsyncClientView@a294d46
    Disconnected
Done
    Successfully published message to topic: jdeon/testTopic jdeon-8646a5c0-5b9a-46a6-815f-0050f96e7f67
        Message content sent: Hello World! jdeon-8646a5c0-5b9a-46a6-815f-0050f96e7f67
    Disconnected
Done
    Successfully published message to topic: jdeon/testTopic jdeon-39b85b5f-1f53-4f4e-b1c7-80f0527b953a
        Message content sent: Hello World! jdeon-39b85b5f-1f53-4f4e-b1c7-80f0527b953a

Upvotes: 0

Views: 269

Answers (1)

der_ambi
der_ambi

Reputation: 26

Disclaimer: I haven't used the Hive async client yet so I do not know how many threads it uses.

If it uses multiple threads, your problem might be the unsychronized use of a single instance variable (_client). There are multiple possible race conditions here, e.g. the first client might reach publishToTopic() after _client has already been used to define the second client. In that case, the first client will trigger the next actions of the second client.

Try keeping the references of the clients separate (e.g. adding them as argument to the callback methods).

Upvotes: 1

Related Questions