user10182984
user10182984

Reputation: 41

How to stop a java method until it receives a message from a MQTT broker

I have a class with 2 methods, each publishing and subscribing to a common MQTT broker separately.

public class operations{
    public void fetch(){
    // do something

     mqttConn.mqttSubscriberFetch(MQTTtopic);

    // do something
   }

    public void store(){
   // do something

    mqttConn.mqttSubscriberStore(MQTTtopic);

   // do something
   }

}

And the MQTT method for method fetch is as follows:

public class mqttConnectionFetch implements  MqttCallback{

    MqttClient client;

    String topic;

    public mqttConnectionFetch() {
    }


    public void mqttSubscriberFetch(String topic) {

        final String MQTT_BROKER = "tcp://localhost:1883" ;
        final String CLIENT_ID = "fetch";
        try {
            client = new MqttClient(MQTT_BROKER, CLIENT_ID);
            client.connect();
            client.setCallback(this);
            client.subscribe(topic);
            MqttMessage message = new MqttMessage();

        } catch (MqttException e) {
            e.printStackTrace();
        }
    }



    @Override
    public void connectionLost(Throwable cause) {
        // TODO Auto-generated method stub

    }

    @Override
    public void messageArrived(String topic, MqttMessage message)
            throws Exception {

        System.out.println("the message received is "+message);

        if(message.toString().equals("Processed")) {
            MqttPublisherFetch("Processed", "operations/fetch");

        }

    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        // TODO Auto-generated method stub

    }


    public void MqttPublisherFetch(String message, String topic) {
        final String MQTT_BROKER = "tcp://localhost:1883" ;
        final String CLIENT_ID = "store";
        try {
            client = new MqttClient(MQTT_BROKER, CLIENT_ID);
            client.connect();
            createMqttMessage(message,topic);
            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    private void createMqttMessage(String message, String topic) throws MqttException {
        MqttMessage publishMessage = new MqttMessage();
        publishMessage.setPayload(message.getBytes());
        client.publish(topic, publishMessage);

    }


}

Now i am trying to have such a functionality that whenever my fetch method is subscribing to a topic, if the message from the broker is "Processing" , it should go on wait state. While the store method should be working normally. And when the message received is "Processed", the fetch should again start working.

I tried this with normal java wait() and start(), But I am not getting the desired output. Can someone help me to demystify this ?

Upvotes: 2

Views: 1804

Answers (1)

As I understand the store method consists of several steps. One of the step is sending a message via MQTT and waiting for the response. From the perspective of the client of the store method it is synchronous that is client will receive response only after the whole processing (including async send/receive via MQTT) is done.

What you want to solve here is a classical problem when one thread needs to wait until some condition happens in another thread. There are many ways to achieve this, just check the number of options proposed in How to make a Java thread wait for another thread's output?.

The simplest would be to use CountDownLatch. I'll use fetch method as this is the one you provided the code for.

First you would modify mqttSubscriberFetch to create a internal CountDownLatch object:

private CountDownLatch processingFinishedLatch;

public void mqttSubscriberFetch(String topic) {

    final String MQTT_BROKER = "tcp://localhost:1883" ;
    final String CLIENT_ID = "fetch";
    try {
        client = new MqttClient(MQTT_BROKER, CLIENT_ID);
        client.connect();
        client.setCallback(this);
        client.subscribe(topic);
        MqttMessage message = new MqttMessage();
        processingFinishedLatch = new CountDownLatch(1);

    } catch (MqttException e) {
        e.printStackTrace();
    }
}

Then you need to trigger this signal in the message receiving callback:

@Override
public void messageArrived(String topic, MqttMessage message)
        throws Exception {

    System.out.println("the message received is "+message);

    if(message.toString().equals("Processed")) {
        MqttPublisherFetch("Processed", "operations/fetch");
        this.processingFinishedLatch.countDown();
    }

}

Also you need to provide the client of the mqttConnectionFetch a way to wait for the latch to become zero:

void waitForProcessingToFinish() throws InterruptedException {
     this.processingFinishedLatch.await();
}

The fetch method should use it like this:

public void fetch(){
// do something

mqttConn.mqttSubscriberFetch(MQTTtopic);

// do everything that is needed to initiate processing

mqttConn.waitForProcessingToFinish()

// do something

}

The thread that is executing fetch will wait in waitForProcessingToFinish until latch reaches zero and that will happen when the appropriate message will come.

This approach can be easily modified to fix the issue when the message never comes. Just use await with some timeout:

boolean waitForProcessingToFinish(long timeout,
        TimeUnit unit) throws InterruptedException {
     return this.processingFinishedLatch.await();
}

fetch should check returned value and probably return error to the caller if timeout happens.

Note that this solution has a downside that thread that is processing fetch will be busy all this time. If this is a thread from the thread pool used for processing incoming HTTP requests this may limit the number of requests your server can process concurrently. There are approaches how to mitigate this like async servlets, RxJava or Spring WebFlux. The actual solution of this problem very much depends on the technology you use for REST API implementation.

Upvotes: 1

Related Questions