spidergears
spidergears

Reputation: 194

Android MQTT using Paho Client. Unable to receive messages

I have a AndroidService to listen MQTT messages. Below is the code. For some reason the service is able to connect and subscribe to channel, but is unable is read messages. messageArrived is never called.

public class FollowService extends Service implements MqttCallback{

    private final IBinder localBinder = new FollowServiceBinder();
    private final String TAG = "Service";
    private MqttClient mqClient;

    public class FollowServiceBinder extends Binder {
        public FollowService getService() {
            return FollowService.this;
        }
    }

    public FollowService() {
    }

    @Override
    public IBinder onBind(Intent intent) {
        return localBinder;
    }

    @Override
    public void onCreate() {
        super.onCreate();
        try {
            mqClient = new MqttClient("tcp://192.168.1.46:1883", "sadfsfi", new MemoryPersistence());
            mqClient.connect();
            Log.i(TAG, "Connected to client");
        }
        catch(MqttException me){
            Log.e(TAG, "MqttClient Exception Occured in on create!!!", me);
        }
    }

    @Keep
    public void beginFollowing(){
        try {
            mqClient.subscribe("test");
            Log.i(TAG, "Subscribed test");
            }
        catch (MqttException me){
            Log.e(TAG, "MqttClient Exception Occured in following!!!", me);
        }
    }

    @Override
    public void connectionLost(Throwable cause) {
        Log.i(TAG, "ConnectionLost");
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        Log.i(TAG, "Delivered");
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        Log.i(TAG, "Received update: " + topic + ":" + message.toString());
    }
}

Upvotes: 1

Views: 2573

Answers (1)

Bertrand Martel
Bertrand Martel

Reputation: 45372

There is Eclipse Paho Android Service which is dedicated to Android you can use instead of the regular MqttClient, it may solves your problem (if your are sure the problem is not on your MQTT server side) & some other problems you may have in the future if you want to settle an Android MQTT service :

If you want to give it a try :

in build.gradle :

compile  'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.3-SNAPSHOT'
compile  ('org.eclipse.paho:org.eclipse.paho.android.service:1.0.3-SNAPSHOT'){
    exclude module: 'support-v4'
}
compile 'com.android.support:support-v4:22.1.0'

In AndroidManifest.xml :

<uses-permission android:name="android.permission.INTERNET" />

and in your <application></application> :

<service android:name="org.eclipse.paho.android.service.MqttService" />

Here is an example MqttHandler.java :

public class MqttHandler {

    protected final static String TAG = DeviceHandler.class.getSimpleName();

    /**
     * MQTT client
     */
    private MqttAndroidClient mClient = null;

    /**
     * client ID used to authenticate
     */
    protected String mClientId = "";

    /**
     * Android context
     */
    private Context mContext = null;

    /**
     * callback for MQTT events
     */
    private MqttCallback mClientCb = null;

    /**
     * callback for MQTT connection
     */
    private IMqttActionListener mConnectionCb = null;

    /**
     * Sets whether the client and server should remember state across restarts and reconnects
     */
    protected boolean mCleanSessionDefault = false;

    /**
     * Sets the connection timeout value (in seconds)
     */
    protected int mTimeoutDefault = 30;

    /**
     * Sets the "keep alive" interval (in seconds)
     */
    protected int mKeepAliveDefault = 60;

    /**
     * connection state
     */
    private boolean connected = false;

    /**
     * list of message callbacks
     */
    private List<IMessageCallback> mMessageCallbacksList = new ArrayList<>();

    private final static String SERVER_URI = "192.168.1.46";

    private final static int SERVER_PORT = 1883;

    public MqttHandler(Context context) {

        this.mContext = context;

        this.mClientCb = new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                connected = false;
                for (int i = 0; i < mMessageCallbacksList.size(); i++) {
                    mMessageCallbacksList.get(i).connectionLost(cause);
                }
            }

            @Override
            public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                for (int i = 0; i < mMessageCallbacksList.size(); i++) {
                    mMessageCallbacksList.get(i).messageArrived(topic, mqttMessage);
                }
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                for (int i = 0; i < mMessageCallbacksList.size(); i++) {
                    mMessageCallbacksList.get(i).deliveryComplete(token);
                }
            }
        };
    }

    public boolean isConnected() {
        if (mClient == null)
            return false;
        else
            return connected;
    }

    public void connect() {

        try {
            if (!isConnected()) {

                MqttConnectOptions options = new MqttConnectOptions();

                String serverURI = "";

                options.setCleanSession(mCleanSessionDefault);
                options.setConnectionTimeout(mTimeoutDefault);
                options.setKeepAliveInterval(mKeepAliveDefault);

                mClient = new MqttAndroidClient(mContext, "tcp://" + SERVER_URI + ":" + SERVER_PORT, mClientId);
                mClient.setCallback(mClientCb);

                mConnectionCb = new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken iMqttToken) {
                        connected = true;
                        for (int i = 0; i < mMessageCallbacksList.size(); i++) {
                            mMessageCallbacksList.get(i).onConnectionSuccess(iMqttToken);
                        }
                    }

                    @Override
                    public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
                        connected = false;
                        for (int i = 0; i < mMessageCallbacksList.size(); i++) {
                            mMessageCallbacksList.get(i).onConnectionFailure(iMqttToken, throwable);
                        }
                    }
                };

                try {
                    mClient.connect(options, mContext, mConnectionCb);
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            } else {
                Log.v(TAG, "cant connect - already connected");
            }
        } catch (IllegalArgumentException e) {
            Log.v(TAG, "parameters error. cant connect");
        }
    }

    public void disconnect() {

        if (isConnected()) {
            try {
                mClient.disconnect(mContext, mConnectionCb);

            } catch (MqttException e) {
                e.printStackTrace();
            }
        } else {
            Log.v(TAG, "cant disconnect - already disconnected");
        }
    }

    /**
     * Publish a message to MQTT server
     *
     * @param topic      message topic
     * @param message    message body
     * @param isRetained define if message should be retained on MQTT server
     * @param listener   completion listener (null allowed)
     * @return
     */
    public IMqttDeliveryToken publishMessage(String topic, String message, boolean isRetained, IMqttActionListener listener) {

        if (isConnected()) {

            MqttMessage mqttMessage = new MqttMessage(message.getBytes());
            mqttMessage.setRetained(isRetained);
            mqttMessage.setQos(0);

            try {
                return mClient.publish(topic, mqttMessage, mContext, listener);
            } catch (MqttPersistenceException e) {
                e.printStackTrace();
            } catch (MqttException e) {
                e.printStackTrace();
            }
        } else {
            Log.e(TAG, "cant publish message. Not connected");
        }
        return null;
    }

    /**
     * Subscribe to topic
     *
     * @param topic    topic to subscribe
     * @param listener completion listener (null allowed)
     * @return
     */
    public void subscribe(String topic, IMqttActionListener listener) {

        if (isConnected()) {
            try {
                mClient.subscribe(topic, 0, mContext, listener);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        } else {
            Log.e(TAG, "cant publish message. Not connected");
        }
    }

    /**
     * Unsubscribe a topic
     *
     * @param topic    topic to unsubscribe
     * @param listener completion listener (null allowed)
     */
    public void unsubscribe(String topic, IMqttActionListener listener) {

        if (isConnected()) {
            try {
                mClient.unsubscribe(topic, mContext, listener);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        } else {
            Log.e(TAG, "cant publish message. Not connected");
        }
    }

    public void addCallback(IMessageCallback callback) {
        mMessageCallbacksList.add(callback);
    }
}

With this listener IMessageCallback.java :

public interface IMessageCallback {

    /**
     * This method is called when the connection to the server is lost.
     *
     * @param cause the reason behind the loss of connection.
     */
    void connectionLost(Throwable cause);

    /**
     * This method is called when a message arrives from the server.
     *
     * @param topic       name of the topic on the message was published to
     * @param mqttMessage the actual message
     * @throws Exception
     */
    void messageArrived(String topic, MqttMessage mqttMessage) throws Exception;

    /**
     * Called when delivery for a message has been completed, and all acknowledgments have been received.
     *
     * @param messageToken he delivery token associated with the message.
     */
    void deliveryComplete(IMqttDeliveryToken messageToken);

    /**
     * Called when connection is established
     *
     * @param iMqttToken token for this connection
     */
    void onConnectionSuccess(IMqttToken iMqttToken);

    /**
     * Called when connection has failed
     *
     * @param iMqttToken token when failure occured
     * @param throwable  exception
     */
    void onConnectionFailure(IMqttToken iMqttToken, Throwable throwable);

    /**
     * Called when disconnection is successfull
     *
     * @param iMqttToken token for this connection
     */
    void onDisconnectionSuccess(IMqttToken iMqttToken);

    /**
     * Called when disconnection failed
     *
     * @param iMqttToken token when failure occured
     * @param throwable  exception
     */
    void onDisconnectionFailure(IMqttToken iMqttToken, Throwable throwable);
}

You can call it like that :

final MqttHandler mqttHandler = new MqttHandler(mContext);

mqttHandler.addCallback(new IMessageCallback() {
    @Override
    public void connectionLost(Throwable cause) {
        Log.v(TAG, "connectionLost");
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        Log.v(TAG, "messageArrived : " + topic + " : " + new String(mqttMessage.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken messageToken) {
        try {
            Log.v(TAG, "deliveryComplete : " + new String(messageToken.getMessage().getPayload()));
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onConnectionSuccess(IMqttToken iMqttToken) {

        Log.v(TAG, "connection success");

        mqttHandler.subscribe("test", new IMqttActionListener() {
            @Override
            public void onSuccess(IMqttToken asyncActionToken) {
                Log.v(TAG, "subscribe success");
            }

            @Override
            public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                Log.e(TAG, "subscribe failure");
            }
        });
    }

    @Override
    public void onConnectionFailure(IMqttToken iMqttToken, Throwable throwable) {
        Log.v(TAG, "connection failure");
    }

    @Override
    public void onDisconnectionSuccess(IMqttToken iMqttToken) {
        Log.v(TAG, "disconnection success");
    }

    @Override
    public void onDisconnectionFailure(IMqttToken iMqttToken, Throwable throwable) {
        Log.v(TAG, "disconnection failure");
    }
});

mqttHandler.connect();

You can find a complete working usecase with Paho Mqtt Android client here

Upvotes: 1

Related Questions