raidensan
raidensan

Reputation: 1139

How to solve async connection issue of paho mqtt client?

Background

I have been playing with MQTT for a project and encountered a odd issue. I am using paho as my MQTT client and VerneMQ as broker.

VerneMQ broker service is up and running, I can confirm this by runnnig netstat and I can see that 127.0.0.1:1883 entry is in LISTENING mode.

This is my code for client:

public class Producer implements MqttCallback {

    private String brokerUri;
    private String clientId;

    public Producer(String brokerUri, String clientId){
        this.brokerUri = brokerUri;
        this.clientId = clientId;
    }

    public void doProduce(String topic, String payload){
        MemoryPersistence memoryPersistence = new MemoryPersistence();

        try {
            MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence);
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setCleanSession(true);
            mqttAsyncClient.setCallback(this);
            mqttAsyncClient.connect(mqttConnectOptions);
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setPayload(payload.getBytes());
            mqttAsyncClient.publish(topic, mqttMessage);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    public void connectionLost(Throwable throwable) {

    }


    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {

    }


    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("Message delivered!");
    }
}

The following is my Main class

public class Main {
    public static void main(String[] args) {
        Producer producer = new Producer("tcp://127.0.0.1:1883", "producer1");
        producer.doProduce("dummyTopic", "dummyMessage");
    }
}

Issue

When I run my application, I see Client is not connected (32104) exception in the output.

If I change the line mqttAsyncClient.connect(mqttConnectOptions); to mqttAsyncClient.connect(mqttConnectOptions).waitForCompletion(); in Producer class, I can successfully connect to broker and I can see Message delivered! in the output.

If I am not mistaken waitForCompletion() will block the call until a response is received. And by adding this line I have effectively changed my AsyncClient connection to blocking connection, which is not the desired approach for me.

Question

How can I resolve this issue so paho MQTT client connects to broker in a non-blocking fashion? Have I missed something along the way?

Upvotes: 2

Views: 8821

Answers (1)

hardillb
hardillb

Reputation: 59816

This covered in the documentation for the IMqttAsyncClient

 IMqttToken token method(parms, Object userContext, IMqttActionListener callback)

In this form a callback is registered with the method. The callback will be notified when the action succeeds or fails. The callback is invoked on the thread managed by the MQTT client so it is important that processing is minimised in the callback. If not the operation of the MQTT client will be inhibited. For example to be notified (called back) when a connect completes:

IMqttToken conToken;
  conToken = asyncClient.connect("some context", new MqttAsyncActionListener() {
      public void onSuccess(IMqttToken asyncActionToken) {
        log("Connected");
      }

      public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
        log ("connect failed" +exception);
      }
});

An optional context object can be passed into the method which will then be made available in the callback. The context is stored by the MQTT client) in the token which is then returned to the invoker. The token is provided to the callback methods where the context can then be accessed.

So your try/catch block should look like this:

try {
  MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence);
  MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
  mqttConnectOptions.setCleanSession(true);
  mqttAsyncClient.setCallback(this);
  mqttAsyncClient.connect(mqttConnectOptions, null, new MqttAsyncActionListener() {
    public void onSuccess(IMqttToken asyncActionToken) {
      MqttMessage mqttMessage = new MqttMessage();
      mqttMessage.setPayload(payload.getBytes());
      mqttAsyncClient.publish(topic, mqttMessage);
    }

    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
      exception.printStackTrace();
    }
});

} catch (MqttException e) {
  e.printStackTrace();
}

Upvotes: 5

Related Questions