Reputation: 61
I'm trying to publish the response to the incoming message in messageArrived(...)
. But the publish hang and the next line: logOutgoingMessage(topic, message)
is never called... At the end i get a deadlock and the client disconnect.
Here is my code:
@Startup
@Singleton
public class AppliMqttClient implements MqttCallback {
@EJB
private AppliFacade facade;
@PostConstruct
public void start() {
try {
// connection options
connOpts = new MqttConnectOptions();
connOpts.setKeepAliveInterval(120);
connOpts.setCleanSession(true);
connOpts.setWill(TESTAMENT_TOPIC, "DOWN!!!!!!!!!!!!!!!!!!".getBytes(), 0, false);
client = new MqttClient(BROKER_URL, MQTT_CLIENT_ID);
client.setCallback(this);
connect();
client.subscribe(SUBSCRIPTION_TOPIC, QoS);
} catch (MqttException me) {
log.error("Connection to " + BROKER_URL + " failed");
logMqttException(me);
}
}
private void connect() {
// Tying a cycle of reconnects.
boolean tryConnecting = true;
while (tryConnecting) {
try {
client.connect(connOpts);
} catch (Exception e1) {
log.error("Connection attempt failed with '" + e1.getCause() + "'. Retrying.");
}
if (client.isConnected()) {
log.info("Connected to Broker " + BROKER_URL);
tryConnecting = false;
} else {
pause();
}
}
}
private void publishAMessage(String topic, String pubMsg) {
MqttMessage message = new MqttMessage(pubMsg.getBytes());
message.setQos(QoS);
// Publish the message
log.info("Publishing to topic \"" + topic + "\" qos " + QoS);
try {
// Publish to the broker
client.publish(topic, message);
// Wait until the message has been delivered to the broker
logOutgoingMessage(topic, message);
} catch (Exception e) {
log.error("Publishing to topic \"" + topic + "\" qos " + QoS + "failed.", e);
}
}
private String handleRquest(AbstractRequest request) throws JsonProcessingException {
...
return jsonResp;
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// generate the response message ID
messageId = "EB" + System.currentTimeMillis();
// log the message
logIncomingMessage(topic, message);
// handle the message
AbstractRequest request = getMapper().readValue(message.toString(), AbstractRequest.class);
// handle the request
String jsonResp = handleRquest(request);
// publish message
publishAMessage(request.getReplyTopic(), jsonResp);
}
/**
*
* Method callback is invoked when a message published by this client is
* successfully received by the broker.
*
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// NOT NEEDED
}
}
Upvotes: 2
Views: 3452
Reputation: 11
It is possible to send a new message within an implementation of this callback (for example, a response to this message), but the implementation must not disconnect the client, as it will be impossible to send an acknowledgment for the message being processed, and a deadlock will occur.
Upvotes: 0
Reputation: 61
Changing the code as following works.
MqttDeliveryToken token;
...
MqttTopic mqttTopic = client.getTopic(topic);
try {
// Publish to the broker
token = mqttTopic.publish(new MqttMessage(pubMsg.getBytes()));
logOutgoingMessage(topic, message);
...
}
But I don't understand why the first implementation doesn't work :x May be publishing in messageArrived() with the QoS 2 is not appropriate ?
Upvotes: 4