Nishanthini Kathir
Nishanthini Kathir

Reputation: 41

MQTT acknowledgement

If I am using QOS type 1 means the broker will continue to send the message to the subscriber until it gets an acknowledgment. How can i set or return ack ? Please anyone shed some light on this.

This is my source code:

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Properties;
import java.util.Vector;

import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.Callback;
import org.fusesource.mqtt.client.CallbackConnection;
import org.fusesource.mqtt.client.Listener;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;

import com.adventnet.management.log.Log;
import com.adventnet.nms.util.NmsLogMgr;
public class DefaultMqttListener implements IMqttListener,Runnable{

    long count = 0;
    long start = System.currentTimeMillis();
    private HashMap serverDetailsHash;
    public DefaultMqttListener(HashMap serverProp)
    {
        this.serverDetailsHash = serverProp;
    }
    CallbackConnection myconnection;
    @Override
    public void init() {
        MQTT mqtt = new MQTT();
        String user = env("APOLLO_USER", (String)serverDetailsHash.get("userName"));    //No I18N
        String password = env("APOLLO_PASSWORD", (String)serverDetailsHash.get("password"));    //No I18N
        String host = env("APOLLO_HOST", (String)serverDetailsHash.get("hostName"));    //No I18N
        int port = Integer.parseInt(env("APOLLO_PORT", (String)serverDetailsHash.get("port")));
        try {
            mqtt.setHost(host, port);
            mqtt.setUserName(user);
            mqtt.setPassword(password);
            final CallbackConnection connection = mqtt.callbackConnection();
            myconnection = connection;
            connection.listener(new org.fusesource.mqtt.client.Listener() {
                public void onConnected() {
                }
                public void onDisconnected() {
                }
                public void onFailure(Throwable value) {
                    value.printStackTrace();
                    System.exit(-2);
                }
                public void onPublish(UTF8Buffer topic, Buffer msg, Runnable ack) {
                        long time =  System.currentTimeMillis();
                        callback( topic,  msg,  ack,connection,time);
                }
            });
            connection.connect(new Callback<Void>() {
                @Override
                public void onSuccess(Void value) {
                    NmsLogMgr.M2MERR.log("MQTT Listener connected in ::::", Log.SUMMARY);
                    ArrayList getTopics = (ArrayList)serverDetailsHash.get("Topics");
                    for(int i=0;i<getTopics.size();i++)
                    {
                        HashMap getTopic = (HashMap)getTopics.get(i);
                        String topicName = (String) getTopic.get("topicName");
                        String qosType = (String) getTopic.get("qosType");
                        Topic[] topic = {new Topic(topicName, getQosType(qosType))};
                        connection.subscribe(topic, new Callback<byte[]>() {
                            public void onSuccess(byte[] qoses) {
                            }
                            public void onFailure(Throwable value) {
                                value.printStackTrace();
                                System.exit(-2);
                            }
                        });
                    }
                    //Topic[] topics = {new Topic("adminTest", QoS.AT_LEAST_ONCE),new Topic("adminTest1", QoS.AT_LEAST_ONCE)};
                }
                @Override
                public void onFailure(Throwable value) {
                    value.printStackTrace();
                    System.exit(-2);
                }
            });

            // Wait forever..
            synchronized (Listener.class) {
                while(true){
                    Listener.class.wait();}

            }
        } catch (URISyntaxException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }
        catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    private static String env(String key, String defaultValue) {
        String rc = System.getenv(key);
        if( rc== null ){
            return defaultValue;}
        return rc;
    }

    @Override
    public void callback(UTF8Buffer topic, Buffer msg, Runnable ack, CallbackConnection connection, long time) {
        // TODO Auto-generated method stub
        try {
            String Message = msg.utf8().toString();
            MQTTMessage mqttMsg = new MQTTMessage();
            mqttMsg.setMQTTMessage(Message);
            mqttMsg.setTime(time);
            mqttMsg.setTopic(topic);
            mqttMsg.sethostName((String) serverDetailsHash.get("hostName"));
            MQTTCacheManager.mgr.addToCache(mqttMsg);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    @Override
    public void close() {
        // TODO Auto-generated method stub
            NmsLogMgr.M2MERR.log("myconnection closed", Log.SUMMARY);
            myconnection.disconnect(new Callback<Void>() {
            @Override
            public void onSuccess(Void value) {
                System.exit(0);
            }
            @Override
            public void onFailure(Throwable value) {
                value.printStackTrace();
                System.exit(-2);
            }
        });

    }

    @Override
    public void run() {
        this.init();
        // TODO Auto-generated method stub
    }
    public QoS getQosType(String name)
    {
        Properties qosContainer = new Properties();
        qosContainer.put("0", QoS.AT_MOST_ONCE);
        qosContainer.put("1", QoS.AT_LEAST_ONCE);
        qosContainer.put("2", QoS.EXACTLY_ONCE);
        QoS qosName = (QoS) qosContainer.get(name);
        return qosName;
    }
}

Upvotes: 4

Views: 11680

Answers (2)

ppatierno
ppatierno

Reputation: 10065

I didn't use the Java library but you need to subscribe to the topic specifying the QoS level 1 (to have at least one delivery) or QoS level 2 (to have exactly once delivery). In these cases, the underlying library sends the ACK packets to the broker.

Upvotes: 1

hardillb
hardillb

Reputation: 59608

You don't send the acknowledgement in your code at all, it should all be handled by the MQTT library you are using.

The QOS ack's packets are between the publisher and the broker and then separately between the broker and any subscribers.

Upvotes: 4

Related Questions