ender.an27
ender.an27

Reputation: 723

AWS IoT Core Custom Authorizer

I'm trying to use an AWS IoT Core Custom Authorizer as shown here (https://docs.aws.amazon.com/iot/latest/developerguide/config-custom-auth.html). I developed the lambda and able to publish using the HTTP endpoint (https://docs.aws.amazon.com/iot/latest/apireference/API_iotdata_Publish.html) also able to call it by the AWS CLI by running aws iot test-invoke-authorizer --authorizer-name <name> --mqtt-context "username=***,password=***,clientId=***". However, when I try to use any other client doesn't work (I used first mosquito, MQTT Explorer, and paho clients). With those, I only get a time out on the client-side, and on the server-side nothing. I also try the Java AWS IoT Core SDK, where I get a TLS negotiation failure in boolean sessionPresent = connected.get(); while connecting. As endpoint I'm using the IoT:Data-ATS. There is a confusing line on the documentation (https://docs.aws.amazon.com/iot/latest/developerguide/protocols.html) that shows in a table that custom auth with MQTT should be used in port 443, with a footnote. The footnote says something like custom auth in port 443 doesn't work. Which doesn't make sense.

Any idea or help?

Mosquitto_sub snipped: mosquitto_sub -d -h **** -p 443 -u username?x-amz-customauthorizer-name=*** -P test -t test --cafile /etc/ssl/certs/Amazon_Root_CA_1.pem -i ***

Paho snipped:

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.nio.charset.StandardCharsets;

public class Paho implements MqttCallback {
    public static void execute(){
        try {
            MqttClient client = new MqttClient("ssl://***:443","***",new MemoryPersistence());

            client.connect();
            client.setCallback(new Paho());
            while (true){
                client.publish("test","test".getBytes(StandardCharsets.UTF_8),0, false);
                Thread.sleep(500);
            }
        } catch (MqttException | InterruptedException e) {
            e.printStackTrace();
        }


    }

    @Override
    public void connectionLost(Throwable throwable) {

    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        System.out.println(new String(mqttMessage.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }

AWS client snipped:

public class AWS implements MqttClientConnectionEvents  {
    public static void execute(){
        try {
           

        try(EventLoopGroup eventLoopGroup = new EventLoopGroup(1);
            HostResolver resolver = new HostResolver(eventLoopGroup);
            ClientBootstrap clientBootstrap = new ClientBootstrap(eventLoopGroup, resolver);
            AwsIotMqttConnectionBuilder builder = AwsIotMqttConnectionBuilder.newDefaultBuilder()) {



            builder.withBootstrap(clientBootstrap)
                    .withConnectionEventCallbacks(new AWS())
                    .withClientId("****")
                    .withEndpoint("***")
                    .withCleanSession(true);


            try(MqttClientConnection connection = builder.build()) {

                CompletableFuture<Boolean> connected = connection.connect();
                try {
                    boolean sessionPresent = connected.get();
                    System.out.println("Connected to " + (!sessionPresent ? "new" : "existing") + " session!");
                } catch (Exception ex) {
                    ex.printStackTrace();
                    throw new RuntimeException("Exception occurred during connect", ex);
                }

                CountDownLatch countDownLatch = new CountDownLatch(10);

                CompletableFuture<Integer> subscribed = connection.subscribe("test", QualityOfService.AT_LEAST_ONCE, (message) -> {
                    String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
                    System.out.println("MESSAGE: " + payload);
                    countDownLatch.countDown();
                });

                subscribed.get();

                int count = 0;
                while (count++ < 10) {
                    CompletableFuture<Integer> published = connection.publish(new MqttMessage("test", "test".getBytes(), QualityOfService.AT_LEAST_ONCE, false));
                    published.get();
                    Thread.sleep(1000);
                }

                countDownLatch.await();

                CompletableFuture<Void> disconnected = connection.disconnect();
                disconnected.get();
            }
        } catch (Exception  ex) {
            System.out.println("Exception encountered: " + ex.toString());
        }

        System.out.println("Complete!");
    }

    @Override
    public void onConnectionInterrupted(int errorCode) {

    }

    @Override
    public void onConnectionResumed(boolean sessionPresent) {

    }
}

Upvotes: 2

Views: 950

Answers (1)

user1803055
user1803055

Reputation: 11

I was having a similar problem using the Python version of paho - I finally fixed it by connecting to port 443 instead of port 8883 - and by adding mqtt to the ALPN protocols. It seems like this may not be supported in the Java library.

It was added in the C library but while I'm writing this it doesn't seem to be supported in the Java version.

Here's the relevant Python code that I used to make it work:

import paho.mqtt.client as paho

client = paho.Client(client_id)
client.username_pw_set(
    f"{username}?x-amz-customauthorizer-name={authorizer}",
    password
)

ssl_context = ssl.create_default_context()
ssl_context.set_alpn_protocols(["mqtt"])
ssl_context.load_verify_locations("AmazonRootCA1.pem")
client.tls_set_context(ssl_context)

client.connect(host, 443)

Upvotes: 1

Related Questions