Pradip Shenolkar
Pradip Shenolkar

Reputation: 817

Implement Eclipse MQTT Android Client using single connection instance

I am using Eclipse Paho android mqtt service in my app. I am able to subscribe and publish the messages to mqtt broker. I have couple of Activities in the app, when any activity is started it connects to broker using mqttAndroidClient.connect(null, new IMqttActionListener() {} and gets the response in mqttAndroidClient.setCallback(new MqttCallback() {}.

My questions:

  1. Is this the correct way to implement the android mqtt service ?
  2. Is there a way to use same connection and callback instance throughout the app ?

Upvotes: 4

Views: 7828

Answers (2)

Pravin Sonawane
Pravin Sonawane

Reputation: 1833

A 'better' way would be to create a Service which connects/reconnects to the MQTT Broker.

I created my own service called MqttConnectionManagerService which maintains and manages connection to the broker.

Key features of this solution:

  1. Service maintains a single instance as long as it is alive.
  2. If service is killed, Android restarts it (because START_STICKY)
  3. Service can be started when device boots.
  4. Service runs in the background and is always connected to receive notifications.
  5. If the service is alive, calling startService(..) again would trigger its onStartCommand() method (and not onCreate()). In this method, we simply check if this client is connected to the broker and connect/reconnect if required.

Sample code:

MqttConnectionManagerService

public class MqttConnectionManagerService extends Service {

    private MqttAndroidClient client;
    private MqttConnectOptions options;

    @Override
    public void onCreate() {
        super.onCreate();
        options = createMqttConnectOptions();
        client = createMqttAndroidClient();
    }


    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        this.connect(client, options);
        return START_STICKY;
    }

    private MqttConnectOptions createMqttConnectOptions() {
        //create and return options
    }

    private MqttAndroidClient createMqttAndroidClient() {
        //create and return client
    }

    public void connect(final MqttAndroidClient client, MqttConnectOptions options) {

        try {
            if (!client.isConnected()) {
                IMqttToken token = client.connect(options);
                //on successful connection, publish or subscribe as usual
                token.setActionCallback(new IMqttActionListener() {..});
                client.setCallback(new MqttCallback() {..});
            }
        } catch (MqttException e) {
            //handle e
        }
    }

}

AndroidManifest.xml

<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
    package="...">

    <!-- Permissions required to receive BOOT_COMPLETED event -->
    <uses-permission android:name="android.permission.RECEIVE_BOOT_COMPLETED" />

    <application
        android:allowBackup="true"
        android:icon="@mipmap/ic_launcher"
        android:label="@string/app_name"
        android:supportsRtl="true"
        android:theme="@style/AppTheme">

        <!-- activities go here -->

        <!-- BroadcastReceiver that starts MqttConnectionManagerService on device boot -->
        <receiver android:name=".MqttServiceStartReceiver">
            <intent-filter>
                <action android:name="android.intent.action.BOOT_COMPLETED" />
            </intent-filter>
        </receiver>

        <!-- Services required for using MQTT -->
        <service android:name="org.eclipse.paho.android.service.MqttService" />
        <service android:name=".MqttConnectionManagerService" />
    </application>

</manifest>

MqttServiceStartReceiver

public class MqttServiceStartReceiver extends BroadcastReceiver {    
    @Override
    public void onReceive(Context context, Intent intent) {
        context.startService(new Intent(context, MqttConnectionManagerService.class));
    }
}

In your Activity's onResume()

startService(new Intent(this, MqttConnectionManagerService.class));

Upvotes: 11

Gaurav
Gaurav

Reputation: 2005

Here is my Singleton implementation of MQTT Client:

    public class MQTTConnection extends ServerConnectionImpl { 
    private static String TAG = MQTTConnection.class.getSimpleName();
    private static Context mContext;
    private static MqttAndroidClient mqttAndroidClient;
    private static String clientId;
    private static MQTTConnection sMqttConnection = null;
    private MQTTConnection() {

    }

    public static MQTTConnection getInstance(Context context) {
        if (null == sMqttConnection) {
            mContext = context;
            init();
        }
        return sMqttConnection;
    }

    public static void reconnectToBroker() {
        try {
            if (sMqttConnection != null) {
                sMqttConnection.disconnect();
            }

            init();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void init() {
        sMqttConnection = new MQTTConnection();
        setClientId();
        connectToBroker();
    }

    private static void connectToBroker() {
        String ip = STBPreferences.getInstance(mContext).getString(Constants.KEY_MQTT_SERVER_IP, null);
        if (ip == null) {
            ip = Constants.MQTT_SERVER_IP;
        }
        final String uri = Constants.MQTT_URI_PREFIX + ip + ":" + Constants.MQTT_SERVER_PORT;
        mqttAndroidClient = new MqttAndroidClient(mContext.getApplicationContext(), uri, clientId);
        mqttAndroidClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {

                if (reconnect) {
                    LogUtil.d(TAG, "Reconnected to : " + serverURI);
                    // Because Clean Session is true, we need to re-subscribe
                    subscribeToTopic();
                } else {
                    LogUtil.d(TAG, "Connected to: " + serverURI);
                }
            }

            @Override
            public void connectionLost(Throwable cause) {
                LogUtil.d(TAG, "The Connection was lost.");
            }

            @Override
            public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                String messageReceived = new String(mqttMessage.getPayload());
                LogUtil.d(TAG, "Incoming message: " + messageReceived);
                try {
                    Gson gson = new Gson();
                    Message message = gson.fromJson(messageReceived, Message.class);
                    // Here you can send message to listeners for processing

                } catch (JsonSyntaxException e) {
                    // Something wrong with message format json
                    e.printStackTrace();
                }
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                LogUtil.d(TAG, "Message delivered");

            }
        });

        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setCleanSession(false);

        try {
            mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    LogUtil.d(TAG, "connect onSuccess");
                    DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
                    disconnectedBufferOptions.setBufferEnabled(true);
                    disconnectedBufferOptions.setBufferSize(100);
                    disconnectedBufferOptions.setPersistBuffer(false);
                    disconnectedBufferOptions.setDeleteOldestMessages(false);
                    mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
                    subscribeToTopic();
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    LogUtil.d(TAG, "Failed to connect to: " + uri);
                }
            });


        } catch (MqttException ex){
            ex.printStackTrace();
        }
    }

    public void publish(Message publishMessage) {

        try {
            Gson gson = new Gson();
            String replyJson = gson.toJson(publishMessage);

            String publishTopic = clientId + Constants.MQTT_PUB_TOPIC_APPEND;
            MqttMessage message = new MqttMessage();
            message.setPayload(replyJson.getBytes());
            mqttAndroidClient.publish(publishTopic, message);
            LogUtil.d(TAG, "Message Published");
            /*if(!mqttAndroidClient.isConnected()){
                LogUtil.d(TAG, mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
            }*/
        } catch (MqttException e) {
            LogUtil.d(TAG, "Error Publishing: " + e.getMessage());
            e.printStackTrace();
        } catch (NullPointerException e) {
            e.printStackTrace();
            if (mqttAndroidClient == null) {
                init();
            }
        }
    }

    private static void subscribeToTopic() {

        try {
            String subscriptionTopic = clientId + Constants.MQTT_SUB_TOPIC_APPEND;
            mqttAndroidClient.subscribe(subscriptionTopic, 0, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    LogUtil.d(TAG, "subscribe onSuccess");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    LogUtil.d(TAG, "Failed to subscribe");
                }
            });

        } catch (MqttException ex){
            System.err.println("Exception whilst subscribing");
            ex.printStackTrace();
        }
    }

    public void unSubscribe() {
        LogUtil.d(TAG, "unSubscribe");
        final String topic = "foo/bar";
        try {
            IMqttToken unsubToken = mqttAndroidClient.unsubscribe(topic);
            unsubToken.setActionCallback(new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    // The subscription could successfully be removed from the client
                    LogUtil.d(TAG, "unSubscribe onSuccess");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken,
                                      Throwable exception) {
                    LogUtil.d(TAG, "unSubscribe onFailure");
                    // some error occurred, this is very unlikely as even if the client
                    // did not had a subscription to the topic the unsubscribe action
                    // will be successfully
                }
            });
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    public void disconnect() {
        LogUtil.d(TAG, "disconnect");
        try {
            IMqttToken disconToken = mqttAndroidClient.disconnect();
            disconToken.setActionCallback(new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    // we are now successfully disconnected
                    LogUtil.d(TAG, "disconnect onSuccess");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken,
                                      Throwable exception) {
                    LogUtil.d(TAG, "disconnect onFailure");
                    // something went wrong, but probably we are disconnected anyway
                }
            });
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private static void setClientId() {
        String srNo = STBPreferences.getInstance(mContext).getString(Constants.STB_SERIAL_NO, null);
        clientId = srNo;
    }

    private String getClientId() {
        if (clientId == null) {
            setClientId();
        }
        return clientId;
    }

    @Override
    public boolean isInternetEnabled() {
        return NetworkUtility.isNetworkAvailable(mContext);
    }

    @Override
    public void sendMessage(Message message) {
        publish(message);
    }

    @Override
    public void reconnect() {
        reconnectToBroker();
    }
}

Here is the message Model. Change Model class for your need.

public class Message {

    /**
     * Type of data
     */
    @SerializedName("type")
    private String type;
    /**
     * Name of component
     */
    @SerializedName("name")
    private String name;
    /**
     * Data in text format
     */
    @Expose
    @SerializedName("data")
    private Object data;

    public Message(String type, String name, Object data) {
        this.type = type;
        this.name = name;
        this.data = data;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Object getData() {
        return data;
    }

    public void setData(Object data) {
        this.data = data;
    }

    @Override
    public String toString() {
        return "Message{" +
                "type=" + type + "\n" +
                "name=" + name + "\n" +
                "data=" + data.toString() +
                '}';
    }
}

Get MQTT instance in your activity

MQTTConnection mqttConnection = HTTPConnection.getInstance(mContext);

Publish Message

mqttConnectin.sendMessage(new Message( ... ));

EDIT 1: Here is my ServerConnectionImpl class for your reference.

public class ServerConnectionImpl extends ConfigurationChangeListenerImpl implements ServerConnection {

/**
 * Logging TAG
 */
private static final String TAG = ServerConnectionImpl.class.getSimpleName();
/**
 * List of all listener which are registered for messages received
 */
private static ArrayList<ConfigurationChangeListenerImpl> sConfigurationChangeListeners = new ArrayList<>();

@Override
public boolean isInternetEnabled() {
    return false;
}

@Override
public ResponseData getSubscriptionDetails(String serialNumber) {
    return null;
}

@Override
public void sendMessage(Message message, WebSocket webSocket) {

}

@Override
public void sendMessage(Message message) {

}

@Override
public void sendMessageToAll(Message message) {

}

//@Override
public static void notifyListeners(int config, Message message, WebSocket wc) {
    switch (config) {
        case Configs.CAMERA: {
            for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) {
                l.onCameraServerChanged();
            }
            break;
        }
        case Configs.GESTURE: {
            for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) {
                l.onGestureCommandServerChanged();
            }
            break;
        }
        case Configs.MOTION_SENSOR: {
            for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) {
                l.onMotionSensorServerChanged();
            }
            break;
        }
        case Configs.MESSAGE: {
            for (ConfigurationChangeListenerImpl l : sConfigurationChangeListeners) {
                l.onMessageReceived(message, wc);
            }
            break;
        }
    }
}
/**
 * Adds listener to listen to messages.
 *
 * @param listener
 */
@Override
public synchronized void addListener(ConfigurationChangeListenerImpl listener) {
    LogUtil.d(TAG, "addListener()");
    if (listener == null) {
        throw new IllegalArgumentException("Invalid listener " + listener);
    }
    sConfigurationChangeListeners.add(listener);
}

/**
 * Removes the listener
 *
 * @param listener
 */
@Override
public synchronized void removeListener(ConfigurationChangeListenerImpl listener) {
    LogUtil.d(TAG, "removeListener()");
    if (listener == null) {
        throw new IllegalArgumentException("Invalid listener " + listener);
    }
    sConfigurationChangeListeners.remove(listener);
}

@Override
public void updateState() {

}

@Override
public void reconnect() {

}

}

You can use your own implementation for ServerConnectionImpl class.

Upvotes: 2

Related Questions