Reputation: 485
I have set up RabbitMQ, enabled web UI for management, enabled mqtt_plugin and the ports 1883, 8883, 5672, 15672 (Docker). I used Paho MqttClient for Android app I am developing to publish a message to the MQ broker. The connection is fine however, there is no message received as a check on the web UI and CLI.
Below is the code I'm working on.
private static final String CONNECTION_URL = "tcp://my-app.com:1883";
private static final String USERNAME = "test_user";
private static final String PASSWORD = "test_pass";
private static final String EXCHANGE = "TestExchange";
private static final String QUEUE = "TestQueue";
private static final String TOPIC = "TestTopic";
// executed onCreate
private void initializeMQ() {
Log.d(TAG, "==== STARTING MQTT CONNECTION ====");
String clientId = "Skwamiyou";
client = new MqttAndroidClient(this, CONNECTION_URL, clientId);
MqttConnectOptions options = setConnectionOptions(USERNAME, PASSWORD);
try {
IMqttToken token = client.connect(options);
token.setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.d(TAG, "Connected");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.d(TAG, "Failed connection");
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
private static MqttConnectOptions setConnectionOptions(String username, String password) {
MqttConnectOptions options = new MqttConnectOptions();
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1);
options.setCleanSession(false);
options.setAutomaticReconnect(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
return options;
}
// this is called on button click publish
public void publishLog() {
Log.d(TAG, "Publishing....");
counter++;
String payload = "Send to My MQ! - " + counter;
try {
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(1);
message.setRetained(true);
client.publish(TOPIC, message);
Toast.makeText(this, "MESSAGE SENT! - " + counter, Toast.LENGTH_SHORT).show();
} catch (MqttException e) {
e.printStackTrace();
}
}
I've been looking around for answers and tried reinstalling MQ but still got the same.
Upvotes: 0
Views: 1181
Reputation: 6981
Here, is common extention for coonectMq and getting message from it. (MqConnectionExtention.kt)
fun Context.connectMq(publishTopicChannelName: String, onConnectionSuccess: (topic: String?, message: MqttMessage?) -> Unit) {
val mClientId = BuildConfig.CLIENT_ID + System.currentTimeMillis()
val mqttAndroidClient = MqttAndroidClient(this, "tcp://34.212.00.188:1883", mClientId)
Timber.e("ChannelName:$publishTopicChannelName")
mqttAndroidClient.setCallback(object : MqttCallbackExtended {
override fun connectComplete(reconnect: Boolean, serverURI: String) {
if (reconnect) { //addToHistory("Reconnected to : " + serverURI)
Log.e("TAG", "Reconnected to : $serverURI")
// Because Clean Session is true, we need to re-subscribe
try {
mqttAndroidClient.subscribe(publishTopicChannelName, 0, object : IMqttMessageListener {
override fun messageArrived(topic: String?, message: MqttMessage?) {
onConnectionSuccess(topic, message)
}
})
} catch (ex: MqttException) {
System.err.println("Exception whilst subscribing")
ex.printStackTrace()
}
} else { //addToHistory("Connected to: " + serverURI);
Log.e("TAG", "Connected to: $serverURI")
}
}
override fun connectionLost(cause: Throwable) {
Log.e("TAG", "The Connection was lost.")
}
override fun messageArrived(topic: String, message: MqttMessage) {
Log.e("TAG", "Incoming message: " + message.payload.toString())
}
override fun deliveryComplete(token: IMqttDeliveryToken) {}
})
val mqttConnectOptions = setUpConnectionOptions("MQ_CONNECTION_USERNAME", "MQ_CONNECTION_PASSWORD")
mqttConnectOptions.isAutomaticReconnect = true
mqttConnectOptions.isCleanSession = false
try {
mqttAndroidClient.connect(mqttConnectOptions, null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken) {
val disconnectedBufferOptions = DisconnectedBufferOptions()
disconnectedBufferOptions.isBufferEnabled = true
disconnectedBufferOptions.bufferSize = 100
disconnectedBufferOptions.isPersistBuffer = false
disconnectedBufferOptions.isDeleteOldestMessages = false
mqttAndroidClient.setBufferOpts(disconnectedBufferOptions)
try {
mqttAndroidClient.subscribe(publishTopicChannelName, 0, object : IMqttMessageListener {
override fun messageArrived(topic: String?, message: MqttMessage?) {
onConnectionSuccess(topic, message)
}
})
} catch (ex: MqttException) {
System.err.println("Exception whilst subscribing")
ex.printStackTrace()
}
}
override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) { //addToHistory("Failed to connect to: " + serverUri);
}
})
} catch (ex: MqttException) {
ex.printStackTrace()
}
}
private fun setUpConnectionOptions(username: String, password: String): MqttConnectOptions {
val connOpts = MqttConnectOptions()
connOpts.isCleanSession = true
connOpts.userName = username
connOpts.password = password.toCharArray()
return connOpts
}
From Java class I am calling it like below and getting message successfully:
private void subscribeMQForVideo() {
MqConnectionExtentionKt.connectMq(mContext, "mq_video_channel_name", (topic, mqttMessage) -> {
// message Arrived!
Log.e("TAG", "Message Video: " + topic + " : " + new String(mqttMessage.getPayload()));
return null;
});
}
To publish message similar extention I have created with little difference. (MqConnectionPublishExtention.kt)
fun Context.connectMq(onConnectionSuccess: (mqttAndroidClient: MqttAndroidClient?) -> Unit) {
val mClientId = BuildConfig.CLIENT_ID + System.currentTimeMillis()
val mqttAndroidClient = MqttAndroidClient(this, BuildConfig.MQ_SERVER_URI, mClientId)
mqttAndroidClient.setCallback(object : MqttCallbackExtended {
override fun connectComplete(reconnect: Boolean, serverURI: String) {
if (reconnect) { //addToHistory("Reconnected to : " + serverURI)
Log.e("TAG", "Reconnected to : $serverURI")
// Because Clean Session is true, we need to re-subscribe
onConnectionSuccess(mqttAndroidClient)
} else { //addToHistory("Connected to: " + serverURI);
Log.e("TAG", "Connected to: $serverURI")
}
}
override fun connectionLost(cause: Throwable) {
Log.e("TAG", "The Connection was lost.")
}
override fun messageArrived(topic: String, message: MqttMessage) {
Log.e("TAG", "Incoming message: " + message.payload.toString())
}
override fun deliveryComplete(token: IMqttDeliveryToken) {}
})
val mqttConnectOptions = setUpConnectionOptions(BuildConfig.MQ_CONNECTION_USERNAME, BuildConfig.MQ_CONNECTION_PASSWORD)
mqttConnectOptions.isAutomaticReconnect = true
mqttConnectOptions.isCleanSession = false
try {
mqttAndroidClient.connect(mqttConnectOptions, null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken) {
val disconnectedBufferOptions = DisconnectedBufferOptions()
disconnectedBufferOptions.isBufferEnabled = true
disconnectedBufferOptions.bufferSize = 100
disconnectedBufferOptions.isPersistBuffer = false
disconnectedBufferOptions.isDeleteOldestMessages = false
mqttAndroidClient.setBufferOpts(disconnectedBufferOptions)
onConnectionSuccess(mqttAndroidClient)
}
override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) { //addToHistory("Failed to connect to: " + serverUri);
}
})
} catch (ex: MqttException) {
ex.printStackTrace()
}
}
private fun setUpConnectionOptions(username: String, password: String): MqttConnectOptions {
val connOpts = MqttConnectOptions()
connOpts.isCleanSession = true
connOpts.userName = username
connOpts.password = password.toCharArray()
return connOpts
}
Publish message from java class
private void publishExerciseDataToMQChannel() {
MqConnectionPublishExtentionKt.connectMq(mContext, (mqttAndroidClient) -> {
try {
JSONObject jsonObject = new JSONObject();
jsonObject.put("params", mlParams);
jsonObject.put("workoutid", workoutId);
jsonObject.put("userid", model.getUserIdFromPrefs());
jsonObject.put("stream_id", streamDataModel.getStreamId());
MqttMessage message = new MqttMessage();
message.setPayload(jsonObject.toString().getBytes());
mqttAndroidClient.publish("Channel_name", message);
Log.e("TAG", message.getQos() + "");
if (!mqttAndroidClient.isConnected()) {
Log.e("TAG", mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
}
} catch (MqttException e) {
System.err.println("Error Publishing: " + e.getMessage());
e.printStackTrace();
} catch (JSONException e) {
System.err.println("Error Publishing: " + e.getMessage());
e.printStackTrace();
}
return null;
});
}
Upvotes: 0