Reputation: 67
I am developing an Android application that publishes ECG packets using the Java HiveMQ MQTT Client library to a HiveMQ broker. The application works as follows:
The user enters a userId(topic), and upon clicking a Start Publish button, a scheduled task generates data packets every 900 ms and adds them to a queue. After adding a packet to the queue, the task calls the publishAllQueuedPackets
method, which publishes the queued packets with QoS 1 via the Mqtt5AsyncClient
. Finally, the queue is cleared after publishing.
However, I am facing two recurring issues:
publishAllQueuedPackets
method. This issue is primarily observed when the client disconnects and reconnects.Note: My implementation for publishing packets may not follow best practices, as I am a beginner in MQTT and Android development. I am open to any suggestions for improvement. Also, please note that in-order publishing of packets to the broker is important.
Below is my full code for reference:
package com.example.hivemqpublisher;
import android.annotation.SuppressLint;
import android.os.Bundle;
import android.text.TextUtils;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import android.widget.EditText;
import android.widget.TextView;
import androidx.appcompat.app.AppCompatActivity;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import org.json.JSONArray;
import org.json.JSONObject;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class MainActivity extends AppCompatActivity {
private static final String TAG = "HiveMQPublisher";
private static final double[] DATA = {0.0, -0.004, /* ... 2500 data points ... */};
private static final int PACKET_SIZE = 125;
private static final long GENERATE_INTERVAL = 900L; // 900ms
private Mqtt5AsyncClient mqttClient;
private ScheduledExecutorService scheduler;
private int packetNumber = 0;
private Queue<JSONObject> packetQueue = new ConcurrentLinkedQueue<>();
private EditText userIdEditText;
private Button startButton, stopButton;
private TextView clientStatusTextView, lastPublishedTextView, queueEndTextView;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
userIdEditText = findViewById(R.id.userIdEditText);
startButton = findViewById(R.id.startPublishingButton);
stopButton = findViewById(R.id.stopPublishingButton);
clientStatusTextView = findViewById(R.id.clientStatusTextView);
lastPublishedTextView = findViewById(R.id.lastPublishedPacketTextView);
queueEndTextView = findViewById(R.id.queueEndPacketTextView);
startButton.setOnClickListener(v -> startPublishing());
stopButton.setOnClickListener(v -> stopPublishing());
Log.d(TAG, "onCreate: UI initialized.");
}
@SuppressLint("SetTextI18n")
private void startPublishing() {
Log.d(TAG, "startPublishing: Attempting to start publishing.");
String userId = userIdEditText.getText().toString().trim();
if (TextUtils.isEmpty(userId)) {
clientStatusTextView.setText("User ID is required!");
Log.e(TAG, "startPublishing: User ID is empty.");
return;
}
userIdEditText.setEnabled(false);
String topic = "hivemq/ff/" + userId;
// Create MQTT Client
Log.d(TAG, "startPublishing: Creating MQTT client.");
mqttClient = Mqtt5Client.builder()
.serverHost("broker.hivemq.com")
.identifier("android_ecg_publisher_" + userId)
.serverPort(8883)
.sslWithDefaultConfig()
.automaticReconnect()
.initialDelay(1, TimeUnit.SECONDS)
.maxDelay(10, TimeUnit.SECONDS)
.applyAutomaticReconnect()
.buildAsync();
// Connect to Broker
mqttClient.connectWith()
.cleanStart(true)
.keepAlive(1)
.sessionExpiryInterval(300)
.send()
.thenAccept(connAck -> {
Log.i(TAG, "Connected to MQTT Broker successfully.");
runOnUiThread(() -> clientStatusTextView.setText("Connected to MQTT Broker"));
startScheduler(topic);
})
.exceptionally(throwable -> {
Log.e(TAG, "Failed to connect to MQTT Broker: " + throwable.getMessage(), throwable);
runOnUiThread(() -> clientStatusTextView.setText("Failed to connect: " + throwable.getMessage()));
return null;
});
startButton.setVisibility(View.GONE);
stopButton.setVisibility(View.VISIBLE);
}
@SuppressLint("SetTextI18n")
private void stopPublishing() {
Log.d(TAG, "stopPublishing: Attempting to stop publishing.");
if (scheduler != null) {
scheduler.shutdownNow();
Log.d(TAG, "stopPublishing: Scheduler shut down.");
}
packetQueue.clear(); // Clear the queue to reset
Log.d(TAG, "stopPublishing: Packet queue cleared.");
if (mqttClient != null && mqttClient.getState().isConnected()) {
mqttClient.disconnect()
.thenRun(() -> Log.i(TAG, "MQTT Client disconnected"))
.exceptionally(throwable -> {
Log.e(TAG, "Failed to disconnect MQTT client: " + throwable.getMessage(), throwable);
return null;
});
}
packetNumber = 1; // Reset the packet number
runOnUiThread(() -> {
startButton.setVisibility(View.VISIBLE);
stopButton.setVisibility(View.GONE);
clientStatusTextView.setText("Publishing stopped");
lastPublishedTextView.setText("Last Published Packet No: 0");
queueEndTextView.setText("Queue End Packet No: 0");
});
}
private void startScheduler(String topic) {
Log.d(TAG, "startScheduler: Starting scheduler for packet generation.");
scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleWithFixedDelay(() -> generatePacket(topic), 0, GENERATE_INTERVAL, TimeUnit.MILLISECONDS);
}
private void generatePacket(String topic) {
try {
Log.d(TAG, "generatePacket: Preparing data for packetNumber: " + packetNumber);
int startIdx = (packetNumber * PACKET_SIZE) % DATA.length;
// Prepare Data Packet
JSONArray jsonArray = new JSONArray();
for (int i = startIdx; i < startIdx + PACKET_SIZE; i++) {
jsonArray.put(DATA[i % DATA.length]);
}
JSONObject packetJson = new JSONObject();
packetJson.put("type", "livecg-data");
packetJson.put("data", jsonArray);
packetJson.put("packetNo", packetNumber);
packetJson.put("Timestamp", System.currentTimeMillis());
packetQueue.add(packetJson);
Log.d(TAG, "generatePacket: Packet added to queue: " + packetNumber);
packetNumber++;
runOnUiThread(() -> queueEndTextView.setText("Queue End Packet No: " + (packetNumber - 1)));
publishAllQueuedPackets(topic);
} catch (Exception e) {
Log.e(TAG, "generatePacket: Error preparing packet: " + e.getMessage(), e);
}
}
private void publishAllQueuedPackets(String topic) {
if (packetQueue.isEmpty()) {
return; // Nothing to publish
}
JSONArray batch = new JSONArray();
AtomicInteger lastPacketNumber = new AtomicInteger(-1); // Atomic wrapper for lambda compatibility
// Build the batch and keep track of the last packet number
for (JSONObject packet : packetQueue) {
batch.put(packet);
lastPacketNumber.set(packet.optInt("packetNo", -1)); // Update the atomic variable
}
mqttClient.publishWith()
.topic(topic)
.qos(MqttQos.AT_LEAST_ONCE)
.payload(batch.toString().getBytes())
.send()
.thenAccept(publishResult -> {
int finalPacketNumber = lastPacketNumber.get(); // Get the last packet number
Log.i(TAG, "publishAllQueuedPackets: Published batch successfully. Last packet: " + finalPacketNumber);
// Update UI with the last published packet
runOnUiThread(() -> {
lastPublishedTextView.setText("Last Published Packet No: " + finalPacketNumber);
});
// Clear the queue after successful publish
packetQueue.clear();
})
.exceptionally(throwable -> {
Log.e(TAG, "publishAllQueuedPackets: Failed to publish batch: " + throwable.getMessage(), throwable);
// Retain the queue for retrying later
return null;
});
}
}
Upvotes: 0
Views: 37