Ernani
Ernani

Reputation: 339

How to subscribe to a MQTT topic and print received messages on Eclipse (Java)

I have a microcontroller with a thermostat sending its data over Raspberry Pi to my computer using MQTT protocol. Kura is installed and working on the Raspberry.

I'm having no problems with receiving the data on Putty, but now I need to receive it on Eclipse so I can develop a program.

I managed to publish on the topic via eclipse using Paho with the following code, (which is an adaptation of this other topic Subscribe and Read MQTT Message Using PAHO):

package publish;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class PublishSemInterface {
MqttClient client;

public PublishSemInterface() {}

public static void main(String[] args) {
    new PublishSemInterface().doDemo();
}

public void doDemo() {
    try {
        client = new MqttClient("tcp://192.168.0.39:1883", "user");
        client.connect();
        MqttMessage message = new MqttMessage();
        message.setPayload("Published message".getBytes());
        client.publish("sensor/temp/out", message);
        client.disconnect();
    } catch (MqttException e) {
        e.printStackTrace();
    }
}
}

But the subscribe is being a pain. I tried using the answer of the topic I mentioned above, implementing MqttCallback interface:

public class PublishSemInterface implements MqttCallback

Adding setCallback after connecting to the client and the required interface methods (I only need messageArrived):

client.setCallback(this);

@Override
public void connectionLost(Throwable cause) {}

@Override
public void messageArrived(String topic, MqttMessage message)
    throws Exception {
System.out.println(message);   
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {}

But it didn't work. I also tried using the answer from the following topic: How to read data from MQTT in Eclipse Paho?

public static void main(String[] args) {

    MqttClient client;
    MqttConnectOptions conn;

    try {
        client = new MqttClient("tcp://192.168.0.39:1883", "user");
        client.connect();
        client.setCallback(new MqttCallback() {
            public void connectionLost(Throwable cause) {}

            public void messageArrived(String topic,
                    MqttMessage message)
                            throws Exception {
                System.out.println(message.toString());
            }

            public void deliveryComplete(IMqttDeliveryToken token) {}
        });

        client.subscribe("sensor/temp/in");

    } catch (MqttException e) {
        e.printStackTrace();
    }
}

Except that it didn't work either. In both cases, when I run the code, the console is active, but when the microcontroller send the data (which appears on Putty), instead of printing it, the program is terminated. It looks as if the messageArrived methods are not being called.

Can anyone help me with the subscription and printing on Eclipse's console?

Upvotes: 3

Views: 16037

Answers (2)

Ernani
Ernani

Reputation: 339

I've managed to make the data being send appear on the Eclipse console. It appears the ClientId was wrong, but I've also added some modifications based on the answers from the topics I've linked on my question. Here's the code:

private Map<String, Object> properties;

public void updated(Map<String, Object> properties) {
  this.properties = properties;
  String broker   = "";
  String clientId = "";
  String topic   = "";

  if(properties != null && !properties.isEmpty()) {

    broker = (String) properties.get("broker.name");
    clientId = (String) properties.get("clientId.name");
    topic = (String) properties.get("topic.name");

    doDemo(broker, clientId, topic);
  }
}

public void doDemo(String broker, String clientId, String topic) {
  MemoryPersistence persistence = new MemoryPersistence();

  try {
    MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
    MqttConnectOptions connOpts = new MqttConnectOptions();
    connOpts.setCleanSession(true);

    sampleClient.setCallback(new MqttCallback() {
      public void connectionLost(Throwable cause) {}

      public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println("Message: " + message.toString());
      }

      public void deliveryComplete(IMqttDeliveryToken token) {}
    });

    sampleClient.connect(connOpts);
    sampleClient.subscribe(topic);

  } catch(MqttException e) {
    e.printStackTrace();
  }
}

Upvotes: 5

stuparm
stuparm

Reputation: 553

As you can see: client.publish("sensor/temp/out", message);, your topic is sensor/temp/out. So your subscriber should be subscribed on the same topic, instead of this line: client.subscribe("sensor/temp/in");, try to subscribe for the topic: sensor/temp/out.

Also I would recommend you to create connection using additional mqtt options. Something like this:

MqttClient client = new MqttClient(serverUrl, UUID.randomUUID().toString().replace("-", "")); //clientID needs to be unique and has meaning only for mqtt broker
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("username"); //part of the password_file inside mqtt broker
options.setPassword("password".toCharArray()); //also part of password_file. Username and password might not be needed.
options.setConnectionTimeout(60);
options.setKeepAliveInterval(60); //how often to send PINGREQ messages
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //newest version
client.connect(options);

Upvotes: 2

Related Questions