Rye
Rye

Reputation: 485

Message not received from Android MQTT publish to RabbitMQ

I have set up RabbitMQ, enabled web UI for management, enabled mqtt_plugin and the ports 1883, 8883, 5672, 15672 (Docker). I used Paho MqttClient for Android app I am developing to publish a message to the MQ broker. The connection is fine however, there is no message received as a check on the web UI and CLI.

rabbitmqctl list_queues

Connection Page: active publisher

Channel Page: Channel page

Exchange Page: active exchanges

Queues Page: available queues

Below is the code I'm working on.

private static final String CONNECTION_URL = "tcp://my-app.com:1883";
private static final String USERNAME = "test_user";
private static final String PASSWORD = "test_pass";
private static final String EXCHANGE = "TestExchange";
private static final String QUEUE = "TestQueue";
private static final String TOPIC = "TestTopic";

// executed onCreate
private void initializeMQ() {
        Log.d(TAG, "==== STARTING MQTT CONNECTION ====");

        String clientId = "Skwamiyou";
        client = new MqttAndroidClient(this, CONNECTION_URL, clientId);
        MqttConnectOptions options = setConnectionOptions(USERNAME, PASSWORD);

        try {
            IMqttToken token = client.connect(options);
            token.setActionCallback(new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.d(TAG, "Connected");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.d(TAG, "Failed connection");
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
}

private static MqttConnectOptions setConnectionOptions(String username, String password) {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);
        options.setCleanSession(false);
        options.setAutomaticReconnect(true);
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        return options;
}

// this is called on button click publish
public void publishLog() {
        Log.d(TAG, "Publishing....");

        counter++;
        String payload = "Send to My MQ! - " + counter;

        try {
            MqttMessage message = new MqttMessage(payload.getBytes());
            message.setQos(1);
            message.setRetained(true);
            client.publish(TOPIC, message);
            Toast.makeText(this, "MESSAGE SENT! - " + counter, Toast.LENGTH_SHORT).show();
        } catch (MqttException e) {
            e.printStackTrace();
        }
}

I've been looking around for answers and tried reinstalling MQ but still got the same.

Upvotes: 0

Views: 1181

Answers (1)

Shweta Chauhan
Shweta Chauhan

Reputation: 6981

Here, is common extention for coonectMq and getting message from it. (MqConnectionExtention.kt)

    fun Context.connectMq(publishTopicChannelName: String, onConnectionSuccess: (topic: String?, message: MqttMessage?) -> Unit) {

    val mClientId = BuildConfig.CLIENT_ID + System.currentTimeMillis()
    val mqttAndroidClient = MqttAndroidClient(this, "tcp://34.212.00.188:1883", mClientId)

    Timber.e("ChannelName:$publishTopicChannelName")
    mqttAndroidClient.setCallback(object : MqttCallbackExtended {
        override fun connectComplete(reconnect: Boolean, serverURI: String) {
            if (reconnect) { //addToHistory("Reconnected to : " + serverURI)
                Log.e("TAG", "Reconnected to : $serverURI")
                // Because Clean Session is true, we need to re-subscribe
                try {
                    mqttAndroidClient.subscribe(publishTopicChannelName, 0, object : IMqttMessageListener {
                        override fun messageArrived(topic: String?, message: MqttMessage?) {
                            onConnectionSuccess(topic, message)
                        }
                    })
                } catch (ex: MqttException) {
                    System.err.println("Exception whilst subscribing")
                    ex.printStackTrace()
                }

            } else { //addToHistory("Connected to: " + serverURI);
                Log.e("TAG", "Connected to: $serverURI")
            }
        }

        override fun connectionLost(cause: Throwable) {
            Log.e("TAG", "The Connection was lost.")
        }

        override fun messageArrived(topic: String, message: MqttMessage) {
            Log.e("TAG", "Incoming message: " + message.payload.toString())
        }

        override fun deliveryComplete(token: IMqttDeliveryToken) {}
    })

    val mqttConnectOptions = setUpConnectionOptions("MQ_CONNECTION_USERNAME", "MQ_CONNECTION_PASSWORD")
    mqttConnectOptions.isAutomaticReconnect = true
    mqttConnectOptions.isCleanSession = false

    try {
        mqttAndroidClient.connect(mqttConnectOptions, null, object : IMqttActionListener {
            override fun onSuccess(asyncActionToken: IMqttToken) {
                val disconnectedBufferOptions = DisconnectedBufferOptions()
                disconnectedBufferOptions.isBufferEnabled = true
                disconnectedBufferOptions.bufferSize = 100
                disconnectedBufferOptions.isPersistBuffer = false
                disconnectedBufferOptions.isDeleteOldestMessages = false
                mqttAndroidClient.setBufferOpts(disconnectedBufferOptions)
                try {
                    mqttAndroidClient.subscribe(publishTopicChannelName, 0, object : IMqttMessageListener {
                        override fun messageArrived(topic: String?, message: MqttMessage?) {
                            onConnectionSuccess(topic, message)
                        }
                    })
                } catch (ex: MqttException) {
                    System.err.println("Exception whilst subscribing")
                    ex.printStackTrace()
                }
            }

            override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) { //addToHistory("Failed to connect to: " + serverUri);
            }
        })
    } catch (ex: MqttException) {
        ex.printStackTrace()
    }
}

private fun setUpConnectionOptions(username: String, password: String): MqttConnectOptions {
    val connOpts = MqttConnectOptions()
    connOpts.isCleanSession = true
    connOpts.userName = username
    connOpts.password = password.toCharArray()
    return connOpts
}

From Java class I am calling it like below and getting message successfully:

private void subscribeMQForVideo() {
    
    MqConnectionExtentionKt.connectMq(mContext, "mq_video_channel_name", (topic, mqttMessage) -> {
        // message Arrived!
        Log.e("TAG", "Message Video: " + topic + " : " + new String(mqttMessage.getPayload()));

        
        return null;
    });
}

To publish message similar extention I have created with little difference. (MqConnectionPublishExtention.kt)

    fun Context.connectMq(onConnectionSuccess: (mqttAndroidClient: MqttAndroidClient?) -> Unit) {

    val mClientId = BuildConfig.CLIENT_ID + System.currentTimeMillis()
    val mqttAndroidClient = MqttAndroidClient(this, BuildConfig.MQ_SERVER_URI, mClientId)

    mqttAndroidClient.setCallback(object : MqttCallbackExtended {
        override fun connectComplete(reconnect: Boolean, serverURI: String) {
            if (reconnect) { //addToHistory("Reconnected to : " + serverURI)
                Log.e("TAG", "Reconnected to : $serverURI")
                // Because Clean Session is true, we need to re-subscribe
                onConnectionSuccess(mqttAndroidClient)

            } else { //addToHistory("Connected to: " + serverURI);
                Log.e("TAG", "Connected to: $serverURI")
            }
        }

        override fun connectionLost(cause: Throwable) {
            Log.e("TAG", "The Connection was lost.")
        }

        override fun messageArrived(topic: String, message: MqttMessage) {
            Log.e("TAG", "Incoming message: " + message.payload.toString())
        }

        override fun deliveryComplete(token: IMqttDeliveryToken) {}
    })

    val mqttConnectOptions = setUpConnectionOptions(BuildConfig.MQ_CONNECTION_USERNAME, BuildConfig.MQ_CONNECTION_PASSWORD)
    mqttConnectOptions.isAutomaticReconnect = true
    mqttConnectOptions.isCleanSession = false

    try {
        mqttAndroidClient.connect(mqttConnectOptions, null, object : IMqttActionListener {
            override fun onSuccess(asyncActionToken: IMqttToken) {
                val disconnectedBufferOptions = DisconnectedBufferOptions()
                disconnectedBufferOptions.isBufferEnabled = true
                disconnectedBufferOptions.bufferSize = 100
                disconnectedBufferOptions.isPersistBuffer = false
                disconnectedBufferOptions.isDeleteOldestMessages = false
                mqttAndroidClient.setBufferOpts(disconnectedBufferOptions)
                onConnectionSuccess(mqttAndroidClient)
            }

            override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) { //addToHistory("Failed to connect to: " + serverUri);
            }
        })
    } catch (ex: MqttException) {
        ex.printStackTrace()
    }
}

private fun setUpConnectionOptions(username: String, password: String): MqttConnectOptions {
    val connOpts = MqttConnectOptions()
    connOpts.isCleanSession = true
    connOpts.userName = username
    connOpts.password = password.toCharArray()
    return connOpts
}

Publish message from java class

private void publishExerciseDataToMQChannel() {
   
 MqConnectionPublishExtentionKt.connectMq(mContext, (mqttAndroidClient) -> {

        try {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("params", mlParams);
            jsonObject.put("workoutid", workoutId);
            jsonObject.put("userid", model.getUserIdFromPrefs());
            jsonObject.put("stream_id", streamDataModel.getStreamId());
            

            MqttMessage message = new MqttMessage();
            message.setPayload(jsonObject.toString().getBytes());
            mqttAndroidClient.publish("Channel_name", message);

            Log.e("TAG", message.getQos() + "");
            if (!mqttAndroidClient.isConnected()) {
                Log.e("TAG", mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
            }
        } catch (MqttException e) {
            System.err.println("Error Publishing: " + e.getMessage());
            e.printStackTrace();
        } catch (JSONException e) {
            System.err.println("Error Publishing: " + e.getMessage());
            e.printStackTrace();
        }
        return null;
    });
}

Upvotes: 0

Related Questions