Reputation: 107
I'm trying to subscribe to a simple topic "foo" from an Eclipse Paho MQTT client.
The broker is managed by Eclipse Kapua and accessible via tcp://localhost:1883 with credentials "kapua-broker" and "kapua-password".
I'm publishing a value this way:
send(new Payload.Builder().put("testKey","testVal"),"foo");
This basically sends a map ("testKey","testVal") with topic "foo". To subscribe to this topic, I have the following code (host="localhost", port=1883):
String topic = "foo";
String broker = "tcp://"+host+":"+Integer.toString(port);
String clientId = "supply-chain-control-simulation-listener";
String username = "kapua-broker";
String password = "kapua-password";
try {
MqttClient client = new MqttClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(username);
connOpts.setPassword(password.toCharArray());
connOpts.setCleanSession(true);
logger.info("Connecting to broker: "+broker);
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
logger.info("Subscriptions stopped");
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
logger.info(s);
logger.info(mqttMessage.getPayload().toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
client.connect(connOpts);
if (client.isConnected())
logger.info("Connected");
else
logger.error(client.getDebug().toString());
client.subscribe(topic,2);
} catch(MqttException me) {
logger.error("reason "+me.getReasonCode());
logger.error("msg "+me.getMessage());
logger.error("loc "+me.getLocalizedMessage());
logger.error("cause "+me.getCause());
logger.error("excep "+me);
me.printStackTrace();
}
The connection works, but the subscription outputs this error:
15:40:03.240 [ActiveMQ NIO Worker 0] WARN o.e.k.b.c.p.KapuaSecurityBrokerFilter - User 1:kapua-broker (supply-chain-control-simulation-listener - tcp://172.17.0.1:40888 - conn id 1734706196170193882) is not authorized to read from: topic://VirtualTopic.foo
Upvotes: 0
Views: 782
Reputation: 51
In Kapua you are allowed to publish/subscribe according to your user permission.
If your user has only broker:connect
permission you can publish/subscribe only on topic:
{account-name}/{connectionClientId}/{semanticTopic}
In your specific case you should publish/subscribe on topic:
kapus-sys/supply-chain-control-simulation-listener/foo
kapua-sys
is the account name to which the user kapua-broker belongs,
while supply-chain-control-simulation-listener
is the clientId used to create the connection.
Please note that username used to connect and account name are two different things in Kapua. An account has multiple users.
Upvotes: 2
Reputation: 22988
Do not subscribe
immediately after calling connect
, but instead move that call into the connectComplete callback:
IMqttAsyncClient client = new MqttAsyncClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(username);
connOpts.setPassword(password.toCharArray());
connOpts.setCleanSession(true);
logger.info("Connecting to broker: "+broker);
client.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String brokerAddress) {
logger.info("Connected");
client.subscribe(topic,2);
}
@Override
public void connectionLost(Throwable throwable) {
logger.info("Subscriptions stopped");
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
logger.info(s);
logger.info(mqttMessage.getPayload().toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
client.connect(connOpts);
That said, your error is probably coming from the MQTT broker you are using and you need to configure it to allow access to that topic.
Upvotes: 1