Reputation: 216
i am trying to publish MQTT messages concurrently using 5 java client, such that each java client publish 1000 messages on a particular topic concurrently to a MQTT broker(HIVEMQ)
i have opened multiple threads, each threads create a mqtt client and connect to broker using ssl and try to publish 1000 messages concurrently,messages are being sent but all the connections are not getting successful to broker and i keep on getting exception
Client is not connected (32104)
at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:31)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:199)
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1355)
at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:583)
at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:575)
at com.test.MqttPublishSample.publishMessages(MqttPublishSample.java:122)
at com.test.MqttPublishSample.lambda$start$0(MqttPublishSample.java:74)
at java.base/java.lang.Thread.run(Thread.java:834)
public class MqttPublishSample {
public static void main(String... args) throws InterruptedException {
new MqttPublishSample().start();
}
public void start() throws InterruptedException {
for(int i=0;i<5;i++){
new Thread(()->{
MqttClient client = null;
try {
client = obtainConnection();//code to obtain connection using MqttClient
publishMessages(client);//code to publish message using simple for loop
} catch (MqttException e) {
e.printStackTrace();
}
}).start();
}
}
public MqttClient obtainConnection() throws MqttException {
String clientId = "sslTestClient"+ThreadLocalRandom.current().nextInt(0,5);
MqttClient client = null;
try {
client = new MqttClient("ssl://localhost:8883", clientId, new MemoryPersistence());
} catch (MqttException e) {
e.printStackTrace();
}
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName("user1");
mqttConnectOptions.setPassword("pass1".toCharArray());
try {
mqttConnectOptions.setSocketFactory(getTruststoreFactory());
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("connecting...");
client.connect(mqttConnectOptions);
return client;
}
i am expecting all clients gets successfully connected to broker and publish message without the exception
Upvotes: 1
Views: 2136
Reputation: 1217
It might be that you are using the same clientID over you thread, thus, the server will disconnect duplicate. As you are using LocalThreadRandom, there is a chance of collision (big enough as there are only 5 choices). You might use a unique identifier provided by generateClientId() or share a method between thread that keep trace of them.
Upvotes: 2