Reputation: 3
package com.servicegateway.mqtt.client;
import java.util.UUID;
import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck;
public class MQTTTestClient {
private static Mqtt3AsyncClient _client = null;
public static void main(String[] args) {
System.out.println("Starting");
for(int i=0; i<2; i++) {
multiClient();
}
}
private static void multiClient() {
UUID uuid = UUID.randomUUID();
String clientID = "jdeon-" + uuid.toString();
System.out.println("\tClient ID: " + clientID);
// Instantiate the client object
_client = MqttClient.builder()
.useMqttVersion3()
.identifier(clientID)
.serverHost("localhost")
.serverPort(1883)
// Test server does not have SSL enabled
//.sslWithDefaultConfig()
.buildAsync();
// Connect to the MQTT Server
System.out.println("Attempting to connect...");
_client.connectWith()
.simpleAuth()
.username("my-user") // Not used by test server
.password("my-password".getBytes()) // Not used by test server
.applySimpleAuth()
.send()
.whenComplete((connAck, throwable) -> {
if (throwable != null) {
System.out.println("Error occurred connecting to server: " + throwable.getMessage());
throwable.printStackTrace();
} else {
handleConnectSuccess(connAck,clientID);
System.out.println("Done");
}
});
System.out.println("Main is done, but client connection code is still running due to asynchronous call.");
}
// Handle a successful connection to the server
private static void handleConnectSuccess(Mqtt3ConnAck connAck,String clientID) {
System.out.println("\tSuccessfully connected to server.");
// Display information returned in connAck variable
System.out.println("\tconnAck -> " + connAck);
// Subscribe to a topic
String topicName = "jdeon/testTopic "+clientID;
subscribeToTopic(topicName);
// Publish a message to the topic
String message = "Hello World! "+clientID;
publishToTopic(topicName, message);
// Disconnect from the MQTT server
if (_client != null) {
try {
System.out.println("\tWait 300ms before disconnecting from server");
Thread.sleep(30000L);
System.out.println("moving to _client.disconnect() method at line :78 "+_client);
_client.disconnect();
System.out.println("\tDisconnected");
} catch (InterruptedException e) {
System.out.println("Error sleeping for 3000ms");
e.printStackTrace();
}
}
}
// Subscribe to specified topic
private static void subscribeToTopic(String topicName) {
System.out.println("\tSubscribing to topic: " + topicName);
_client.subscribeWith()
.topicFilter(topicName)
.callback(publish -> {
// Process the received message
System.out.println("\tReceived message: " + publish);
String message = new String(publish.getPayloadAsBytes());
System.out.println("\t\tMessage content received: " + message);
})
.send()
.whenComplete((subAck, throwable) -> {
if (throwable != null) {
System.out.println("Error occurred subscribing to topic (" + topicName + "): " + throwable.getMessage());
throwable.printStackTrace();
} else {
System.out.println("\tSuccessfully subscribed to topic: " + topicName);
}
});
}
// Publish specified message to specified topic
private static void publishToTopic(String topicName, String message) {
System.out.println("\tPublishing message to topic: " + topicName+", message: "+message);
_client.publishWith()
.topic(topicName)
.payload(message.getBytes())
.send()
.whenComplete((publish, throwable) -> {
if (throwable != null) {
System.out.println("Error occurred publishing message to topic (" + topicName + "): " + throwable.getMessage());
throwable.printStackTrace();
} else {
System.out.println("\tSuccessfully published message to topic: " + topicName);
System.out.println("\t\tMessage content sent: " + message);
}
});
}
}
I am running above program in Eclipse and 1st client is getting disconnected gracefully and also getting logged in MQTT server (event.log) but second client is not getting disconnected. When I click "terminate" button in Eclipse, 2nd client is getting logged as "disconnected ungracefully" why it not getting disconnected until I terminate it? i attached program and console results are shown below :
Starting
Client ID: jdeon-39b85b5f-1f53-4f4e-b1c7-80f0527b953a
Attempting to connect...
Main is done, but client connection code is still running due to asynchronous call.
Client ID: jdeon-8646a5c0-5b9a-46a6-815f-0050f96e7f67
Attempting to connect...
Main is done, but client connection code is still running due to asynchronous call.
Successfully connected to server.
connAck -> MqttConnAck{returnCode=SUCCESS, sessionPresent=false}
Subscribing to topic: jdeon/testTopic jdeon-39b85b5f-1f53-4f4e-b1c7-80f0527b953a
Successfully connected to server.
connAck -> MqttConnAck{returnCode=SUCCESS, sessionPresent=false}
Subscribing to topic: jdeon/testTopic jdeon-8646a5c0-5b9a-46a6-815f-0050f96e7f67
Publishing message to topic: jdeon/testTopic jdeon-8646a5c0-5b9a-46a6-815f-0050f96e7f67, message: Hello World! jdeon-8646a5c0-5b9a-46a6-815f-0050f96e7f67
Publishing message to topic: jdeon/testTopic jdeon-39b85b5f-1f53-4f4e-b1c7-80f0527b953a, message: Hello World! jdeon-39b85b5f-1f53-4f4e-b1c7-80f0527b953a
Wait 300ms before disconnecting from server
Wait 300ms before disconnecting from server
Successfully subscribed to topic: jdeon/testTopic jdeon-39b85b5f-1f53-4f4e-b1c7-80f0527b953a
Successfully subscribed to topic: jdeon/testTopic jdeon-8646a5c0-5b9a-46a6-815f-0050f96e7f67
Received message: MqttPublish{topic=jdeon/testTopic jdeon-8646a5c0-5b9a-46a6-815f-0050f96e7f67, payload=55byte, qos=AT_MOST_ONCE, retain=false}
Message content received: Hello World! jdeon-8646a5c0-5b9a-46a6-815f-0050f96e7f67
Received message: MqttPublish{topic=jdeon/testTopic jdeon-39b85b5f-1f53-4f4e-b1c7-80f0527b953a, payload=55byte, qos=AT_MOST_ONCE, retain=false}
Message content received: Hello World! jdeon-39b85b5f-1f53-4f4e-b1c7-80f0527b953a
moving to _client.disconnect() method at line :78 com.hivemq.client.internal.mqtt.mqtt3.Mqtt3AsyncClientView@a294d46
moving to _client.disconnect() method at line :78 com.hivemq.client.internal.mqtt.mqtt3.Mqtt3AsyncClientView@a294d46
Disconnected
Done
Successfully published message to topic: jdeon/testTopic jdeon-8646a5c0-5b9a-46a6-815f-0050f96e7f67
Message content sent: Hello World! jdeon-8646a5c0-5b9a-46a6-815f-0050f96e7f67
Disconnected
Done
Successfully published message to topic: jdeon/testTopic jdeon-39b85b5f-1f53-4f4e-b1c7-80f0527b953a
Message content sent: Hello World! jdeon-39b85b5f-1f53-4f4e-b1c7-80f0527b953a
Upvotes: 0
Views: 269
Reputation: 26
Disclaimer: I haven't used the Hive async client yet so I do not know how many threads it uses.
If it uses multiple threads, your problem might be the unsychronized use of a single instance variable (_client). There are multiple possible race conditions here, e.g. the first client might reach publishToTopic() after _client has already been used to define the second client. In that case, the first client will trigger the next actions of the second client.
Try keeping the references of the clients separate (e.g. adding them as argument to the callback methods).
Upvotes: 1