Reputation: 41
I have a class with 2 methods, each publishing and subscribing to a common MQTT broker separately.
public class operations{
public void fetch(){
// do something
mqttConn.mqttSubscriberFetch(MQTTtopic);
// do something
}
public void store(){
// do something
mqttConn.mqttSubscriberStore(MQTTtopic);
// do something
}
}
And the MQTT method for method fetch is as follows:
public class mqttConnectionFetch implements MqttCallback{
MqttClient client;
String topic;
public mqttConnectionFetch() {
}
public void mqttSubscriberFetch(String topic) {
final String MQTT_BROKER = "tcp://localhost:1883" ;
final String CLIENT_ID = "fetch";
try {
client = new MqttClient(MQTT_BROKER, CLIENT_ID);
client.connect();
client.setCallback(this);
client.subscribe(topic);
MqttMessage message = new MqttMessage();
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void connectionLost(Throwable cause) {
// TODO Auto-generated method stub
}
@Override
public void messageArrived(String topic, MqttMessage message)
throws Exception {
System.out.println("the message received is "+message);
if(message.toString().equals("Processed")) {
MqttPublisherFetch("Processed", "operations/fetch");
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub
}
public void MqttPublisherFetch(String message, String topic) {
final String MQTT_BROKER = "tcp://localhost:1883" ;
final String CLIENT_ID = "store";
try {
client = new MqttClient(MQTT_BROKER, CLIENT_ID);
client.connect();
createMqttMessage(message,topic);
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
private void createMqttMessage(String message, String topic) throws MqttException {
MqttMessage publishMessage = new MqttMessage();
publishMessage.setPayload(message.getBytes());
client.publish(topic, publishMessage);
}
}
Now i am trying to have such a functionality that whenever my fetch method is subscribing to a topic, if the message from the broker is "Processing" , it should go on wait state. While the store method should be working normally. And when the message received is "Processed", the fetch should again start working.
I tried this with normal java wait() and start(), But I am not getting the desired output. Can someone help me to demystify this ?
Upvotes: 2
Views: 1804
Reputation: 15901
As I understand the store method consists of several steps. One of the step is sending a message via MQTT and waiting for the response. From the perspective of the client of the store
method it is synchronous that is client will receive response only after the whole processing (including async send/receive via MQTT) is done.
What you want to solve here is a classical problem when one thread needs to wait until some condition happens in another thread. There are many ways to achieve this, just check the number of options proposed in How to make a Java thread wait for another thread's output?.
The simplest would be to use CountDownLatch
. I'll use fetch
method as this is the one you provided the code for.
First you would modify mqttSubscriberFetch
to create a internal CountDownLatch
object:
private CountDownLatch processingFinishedLatch;
public void mqttSubscriberFetch(String topic) {
final String MQTT_BROKER = "tcp://localhost:1883" ;
final String CLIENT_ID = "fetch";
try {
client = new MqttClient(MQTT_BROKER, CLIENT_ID);
client.connect();
client.setCallback(this);
client.subscribe(topic);
MqttMessage message = new MqttMessage();
processingFinishedLatch = new CountDownLatch(1);
} catch (MqttException e) {
e.printStackTrace();
}
}
Then you need to trigger this signal in the message receiving callback:
@Override
public void messageArrived(String topic, MqttMessage message)
throws Exception {
System.out.println("the message received is "+message);
if(message.toString().equals("Processed")) {
MqttPublisherFetch("Processed", "operations/fetch");
this.processingFinishedLatch.countDown();
}
}
Also you need to provide the client of the mqttConnectionFetch
a way to wait for the latch to become zero:
void waitForProcessingToFinish() throws InterruptedException {
this.processingFinishedLatch.await();
}
The fetch
method should use it like this:
public void fetch(){
// do something
mqttConn.mqttSubscriberFetch(MQTTtopic);
// do everything that is needed to initiate processing
mqttConn.waitForProcessingToFinish()
// do something
}
The thread that is executing fetch
will wait in waitForProcessingToFinish
until latch reaches zero and that will happen when the appropriate message will come.
This approach can be easily modified to fix the issue when the message never comes. Just use await with some timeout:
boolean waitForProcessingToFinish(long timeout,
TimeUnit unit) throws InterruptedException {
return this.processingFinishedLatch.await();
}
fetch
should check returned value and probably return error to the caller if timeout happens.
Note that this solution has a downside that thread that is processing fetch
will be busy all this time. If this is a thread from the thread pool used for processing incoming HTTP requests this may limit the number of requests your server can process concurrently.
There are approaches how to mitigate this like async servlets, RxJava or Spring WebFlux. The actual solution of this problem very much depends on the technology you use for REST API implementation.
Upvotes: 1