Guy Yannick
Guy Yannick

Reputation: 61

Trying to publish in messageArrived() with the Paho Client MqttCallback

I'm trying to publish the response to the incoming message in messageArrived(...). But the publish hang and the next line: logOutgoingMessage(topic, message) is never called... At the end i get a deadlock and the client disconnect.

Here is my code:

@Startup
@Singleton
public class AppliMqttClient implements MqttCallback {

@EJB
private AppliFacade facade;

@PostConstruct
public void start() {
    try {
        // connection options
        connOpts = new MqttConnectOptions();
        connOpts.setKeepAliveInterval(120);         
        connOpts.setCleanSession(true);
        connOpts.setWill(TESTAMENT_TOPIC, "DOWN!!!!!!!!!!!!!!!!!!".getBytes(), 0, false);

        client = new MqttClient(BROKER_URL, MQTT_CLIENT_ID);
        client.setCallback(this);
        connect();

        client.subscribe(SUBSCRIPTION_TOPIC, QoS);
    } catch (MqttException me) {
        log.error("Connection to " + BROKER_URL + " failed");
        logMqttException(me);
    }

}

private void connect() {
    // Tying a cycle of reconnects.
    boolean tryConnecting = true;
    while (tryConnecting) {
        try {
            client.connect(connOpts);
        } catch (Exception e1) {
            log.error("Connection attempt failed with '" + e1.getCause() + "'. Retrying.");             
        }
        if (client.isConnected()) {
            log.info("Connected to Broker " + BROKER_URL);
            tryConnecting = false;
        } else {
            pause();
        }
    }
}

private void publishAMessage(String topic, String pubMsg) {
    MqttMessage message = new MqttMessage(pubMsg.getBytes());
    message.setQos(QoS);
    // Publish the message
    log.info("Publishing to topic \"" + topic + "\" qos " + QoS);
    try {
        // Publish to the broker
        client.publish(topic, message);
        // Wait until the message has been delivered to the broker
        logOutgoingMessage(topic, message);
    } catch (Exception e) {
        log.error("Publishing to topic \"" + topic + "\" qos " + QoS + "failed.", e);
    }
}

private String handleRquest(AbstractRequest request) throws JsonProcessingException {
    ...

    return jsonResp;
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    // generate the response message ID
    messageId = "EB" + System.currentTimeMillis();

    // log the message
    logIncomingMessage(topic, message);

    // handle the message
    AbstractRequest request = getMapper().readValue(message.toString(), AbstractRequest.class);

    // handle the request
    String jsonResp = handleRquest(request);

    // publish message
    publishAMessage(request.getReplyTopic(), jsonResp);
}

/**
 * 
 * Method callback is invoked when a message published by this client is
 * successfully received by the broker.
 * 
 */
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
    // NOT NEEDED
}

}

Upvotes: 2

Views: 3452

Answers (2)

Hisham
Hisham

Reputation: 11

It is possible to send a new message within an implementation of this callback (for example, a response to this message), but the implementation must not disconnect the client, as it will be impossible to send an acknowledgment for the message being processed, and a deadlock will occur.

official link from eclipse.org

Upvotes: 0

Guy Yannick
Guy Yannick

Reputation: 61

Changing the code as following works.

MqttDeliveryToken token;
...
MqttTopic mqttTopic = client.getTopic(topic);
try {
  // Publish to the broker
  token = mqttTopic.publish(new MqttMessage(pubMsg.getBytes()));
  logOutgoingMessage(topic, message);
  ...
 }

But I don't understand why the first implementation doesn't work :x May be publishing in messageArrived() with the QoS 2 is not appropriate ?

Upvotes: 4

Related Questions