Arthur Feldman
Arthur Feldman

Reputation: 107

Eclipse Kapua broker : Not authorized to subscribe to topic

I'm trying to subscribe to a simple topic "foo" from an Eclipse Paho MQTT client.

The broker is managed by Eclipse Kapua and accessible via tcp://localhost:1883 with credentials "kapua-broker" and "kapua-password".

I'm publishing a value this way:

send(new Payload.Builder().put("testKey","testVal"),"foo");

This basically sends a map ("testKey","testVal") with topic "foo". To subscribe to this topic, I have the following code (host="localhost", port=1883):

    String topic = "foo";
    String broker = "tcp://"+host+":"+Integer.toString(port);
    String clientId = "supply-chain-control-simulation-listener";
    String username = "kapua-broker";
    String password = "kapua-password";

    try {
        MqttClient client = new MqttClient(broker, clientId);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(true);
        connOpts.setUserName(username);
        connOpts.setPassword(password.toCharArray());
        connOpts.setCleanSession(true);
        logger.info("Connecting to broker: "+broker);
        client.setCallback(new MqttCallback() {
            @Override
            public void connectionLost(Throwable throwable) {
                logger.info("Subscriptions stopped");
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                logger.info(s);
                logger.info(mqttMessage.getPayload().toString());
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

            }
        });
        client.connect(connOpts);
        if (client.isConnected())
            logger.info("Connected");
        else
            logger.error(client.getDebug().toString());
        client.subscribe(topic,2);
    } catch(MqttException me) {
        logger.error("reason "+me.getReasonCode());
        logger.error("msg "+me.getMessage());
        logger.error("loc "+me.getLocalizedMessage());
        logger.error("cause "+me.getCause());
        logger.error("excep "+me);
        me.printStackTrace();
    }

The connection works, but the subscription outputs this error:

15:40:03.240 [ActiveMQ NIO Worker 0] WARN o.e.k.b.c.p.KapuaSecurityBrokerFilter - User 1:kapua-broker (supply-chain-control-simulation-listener - tcp://172.17.0.1:40888 - conn id 1734706196170193882) is not authorized to read from: topic://VirtualTopic.foo

Upvotes: 0

Views: 782

Answers (2)

Coduz
Coduz

Reputation: 51

In Kapua you are allowed to publish/subscribe according to your user permission.

If your user has only broker:connect permission you can publish/subscribe only on topic:

{account-name}/{connectionClientId}/{semanticTopic}

In your specific case you should publish/subscribe on topic:

kapus-sys/supply-chain-control-simulation-listener/foo

kapua-sys is the account name to which the user kapua-broker belongs, while supply-chain-control-simulation-listener is the clientId used to create the connection.

Please note that username used to connect and account name are two different things in Kapua. An account has multiple users.

Upvotes: 2

Alexander Farber
Alexander Farber

Reputation: 22988

Do not subscribe immediately after calling connect, but instead move that call into the connectComplete callback:

IMqttAsyncClient client = new MqttAsyncClient(broker, clientId);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(username);
connOpts.setPassword(password.toCharArray());
connOpts.setCleanSession(true);
logger.info("Connecting to broker: "+broker);
client.setCallback(new MqttCallbackExtended() {
    @Override
    public void connectComplete(boolean reconnect, String brokerAddress) {
        logger.info("Connected");
        client.subscribe(topic,2);
    }
    @Override
    public void connectionLost(Throwable throwable) {
        logger.info("Subscriptions stopped");
    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        logger.info(s);
        logger.info(mqttMessage.getPayload().toString());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }
});
client.connect(connOpts);

That said, your error is probably coming from the MQTT broker you are using and you need to configure it to allow access to that topic.

Upvotes: 1

Related Questions