Reputation: 1139
Background
I have been playing with MQTT for a project and encountered a odd issue. I am using paho
as my MQTT client and VerneMQ
as broker.
VerneMQ broker service is up and running, I can confirm this by runnnig netstat
and I can see that 127.0.0.1:1883
entry is in LISTENING
mode.
This is my code for client:
public class Producer implements MqttCallback {
private String brokerUri;
private String clientId;
public Producer(String brokerUri, String clientId){
this.brokerUri = brokerUri;
this.clientId = clientId;
}
public void doProduce(String topic, String payload){
MemoryPersistence memoryPersistence = new MemoryPersistence();
try {
MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence);
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttAsyncClient.setCallback(this);
mqttAsyncClient.connect(mqttConnectOptions);
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payload.getBytes());
mqttAsyncClient.publish(topic, mqttMessage);
} catch (MqttException e) {
e.printStackTrace();
}
}
public void connectionLost(Throwable throwable) {
}
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("Message delivered!");
}
}
The following is my Main class
public class Main {
public static void main(String[] args) {
Producer producer = new Producer("tcp://127.0.0.1:1883", "producer1");
producer.doProduce("dummyTopic", "dummyMessage");
}
}
Issue
When I run my application, I see Client is not connected (32104)
exception in the output.
If I change the line mqttAsyncClient.connect(mqttConnectOptions);
to mqttAsyncClient.connect(mqttConnectOptions).waitForCompletion();
in Producer
class, I can successfully connect to broker and I can see Message delivered!
in the output.
If I am not mistaken waitForCompletion()
will block the call until a response is received. And by adding this line I have effectively changed my AsyncClient connection to blocking connection, which is not the desired approach for me.
Question
How can I resolve this issue so paho MQTT client connects to broker in a non-blocking fashion? Have I missed something along the way?
Upvotes: 2
Views: 8821
Reputation: 59816
This covered in the documentation for the IMqttAsyncClient
IMqttToken token method(parms, Object userContext, IMqttActionListener callback)
In this form a callback is registered with the method. The callback will be notified when the action succeeds or fails. The callback is invoked on the thread managed by the MQTT client so it is important that processing is minimised in the callback. If not the operation of the MQTT client will be inhibited. For example to be notified (called back) when a connect completes:
IMqttToken conToken; conToken = asyncClient.connect("some context", new MqttAsyncActionListener() { public void onSuccess(IMqttToken asyncActionToken) { log("Connected"); } public void onFailure(IMqttToken asyncActionToken, Throwable exception) { log ("connect failed" +exception); } });
An optional context object can be passed into the method which will then be made available in the callback. The context is stored by the MQTT client) in the token which is then returned to the invoker. The token is provided to the callback methods where the context can then be accessed.
So your try/catch block should look like this:
try {
MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence);
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttAsyncClient.setCallback(this);
mqttAsyncClient.connect(mqttConnectOptions, null, new MqttAsyncActionListener() {
public void onSuccess(IMqttToken asyncActionToken) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payload.getBytes());
mqttAsyncClient.publish(topic, mqttMessage);
}
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
exception.printStackTrace();
}
});
} catch (MqttException e) {
e.printStackTrace();
}
Upvotes: 5