Sadan A.
Sadan A.

Reputation: 1107

Unable to receive IoT device shadow update events

I am trying to build a device client that is responsible for managing the IoT device. Idea is to have a control agent running all the time in a process which is responsible for provisioning the device and communicate with it's shadows for future updates and state management. Here's the code for my control agent. Another application running in the device sends request to this control agent via zeromq topic in req-rep pattern.

class ControlAgent(DeviceAgent):

    def __init__(self):
        super().__init__()

        self.template_name = self.client_config.get("fleetProvisioningTemplateName")

        self.sw_version = importlib.metadata.version("device_client")

        self.create_keys_response = None
        self.register_thing_response = None

        self.thing_name = None
        self.is_provisioned = False

        self._initialize_device()

        if self.is_provisioned:
            self._get_mqtt_client()
            self._subscribe_to_shadows()

    def _initialize_device(self):
        """Loads device configuration on startup and subscribes to named shadows if provisioned."""
        try:
            # TODO: Need a way to get the serial number
            device_config = self.db_manager.load_device_config(serial_number="ZN012345")
            if device_config:
                logger.info(
                    "[CONTROL] Device is already provisioned. Loading device configuration..."
                )
                self.device_config = device_config

                self.thing_name = self.device_config["thingName"]
                self.is_provisioned = True
        except Exception as e:
            logger.warning(
                f"[CONTROL] No existing device configuration found. Device is not provisioned. {e}"
            )

    def _subscribe_to_shadows(self):
        logger.info(f"[CONTROL] Subscribing to named shadows {self.mqtt_client}...")

        tasks = [
            self.subscribe_to_named_shadow_updates(
                self.thing_name, "reader-config-policy"
            )
        ]
        asyncio.gather(*tasks)

    async def handle_request(self, message):
        """Handles incoming REQ-REP messages."""
        command = message.get("command")
        logger.info(f"[CONTROL] Received command {command}.")

        if command == "onboard":
            return await self.onboard(message)
        elif command == "boot":
            return await self.boot()
        elif command == "shutdown":
            return await self.shutdown()
        elif command == "config":
            return await self.get_device_config(message)
        else:
            return {"error": "Unknown command"}

    async def onboard(self, message):
        """
        Starts AWS IoT Fleet Provisioning using the device's MAC address.
        """
        serial_number = message.get("serialNumber")

        config = await self._load_device_config(serial_number)

        if config:
            logger.info("[CONTROL] Device already provisioned.")
            return {"error": "Device already provisioned."}

        logger.info("[CONTROL] Starting IoT Fleet Provisioning...")

        self.create_keys_response = None
        self.register_thing_response = None

        # TODO: validate request message.

        # Create MQTT Connection
        mqtt_client = self._get_mqtt_client(provisioning=True)
        # Initialize AWS IoT Identity Client
        identity_client = iotidentity.IotIdentityClient(mqtt_client)

        # CreateKeysAndCertificate
        await self._create_device_cert(identity_client)
        await self._wait_for("create_keys_response", 10, "CreateKeysAndCertificate")

        if not self.create_keys_response:
            return {"error": "CreateKeysAndCertificate failed"}

        # RegisterThing
        await self._register_thing(identity_client, serial_number, message)
        await self._wait_for("register_thing_response", 10, "RegisterThing")

        if not self.register_thing_response:
            return {"error": "RegisterThing failed"}

        logger.info("[CONTROL] Storing device certificates.")
        self.keystore.store_device_certificates(
            cert_pem=self.create_keys_response.certificate_pem,
            cert_key=self.create_keys_response.private_key,
        )

        logger.info("[CONTROL] Saving device configuration...")

        thing_name = self.register_thing_response.thing_name

        self.db_manager.save_device_config(
            serial_number,
            {...},
        )

        self.device_config = await self._load_device_config(serial_number)
        self.is_provisioned = True
        self.thing_name = thing_name

        # Wait for configuration file to be saved
        await asyncio.sleep(3)

        self._restart_mqtt()

        # Subscribe to named shadows
        self._subscribe_to_shadows()

        await asyncio.sleep(3)

        # Update named shadows
        await self.publish_named_shadow_update(
            thing_name, "reader-config-policy", reader_config
        )

        logger.info(f"[CONTROL] Device onboarding complete {self.mqtt_client}...")

        return {"status": "Onboarding completed."}

    def subscribe_to_named_shadow_updates(self, thing_name, shadow_name):
        """Subscribes to the shadow update accepted, rejected, and delta topics using AWS IoT SDK."""

        def on_accepted(response):
            logger.info(f"[CONTROL] Shadow '{shadow_name}' update accepted...")

        def on_rejected(error):
            logger.error(f"[CONTROL] Shadow '{shadow_name}' update rejected...")

        def on_delta(response):
            """Handles shadow delta updates and applies changes to the device."""
            logger.info(f"[CONTROL] Shadow '{shadow_name}' delta received...")
            delta_state = response.state
            try:
                self.apply_shadow_update(shadow_name, delta_state)
            except RuntimeError:
                logger.error(f"[CONTROL] Failed to apply delta for {shadow_name}...")

        try:
            # mqtt_client = self._get_mqtt_client()
            shadow_client = iotshadow.IotShadowClient(self.mqtt_client)

            # Subscribe to update accepted
            accepted, _ = shadow_client.subscribe_to_update_named_shadow_accepted(
                request=iotshadow.UpdateNamedShadowSubscriptionRequest(
                    thing_name=thing_name, shadow_name=shadow_name
                ),
                qos=mqtt5.QoS.AT_LEAST_ONCE,
                callback=on_accepted,
            )

            # Subscribe to update rejected
            rejected, _ = shadow_client.subscribe_to_update_named_shadow_rejected(
                request=iotshadow.UpdateNamedShadowSubscriptionRequest(
                    thing_name=thing_name, shadow_name=shadow_name
                ),
                qos=mqtt5.QoS.AT_LEAST_ONCE,
                callback=on_rejected,
            )

            # Subscribe to update/delta to detect changes
            delta, _ = shadow_client.subscribe_to_named_shadow_delta_updated_events(
                request=iotshadow.NamedShadowDeltaUpdatedSubscriptionRequest(
                    thing_name=thing_name, shadow_name=shadow_name
                ),
                qos=mqtt5.QoS.AT_LEAST_ONCE,
                callback=on_delta,
            )

            # Wait for subscriptions to succeed
            accepted.result()
            rejected.result()
            delta.result()

            logger.info(
                f"[CONTROL] Successfully subscribed to {thing_name}/shadow/name/{shadow_name} shadows..."
            )

        except Exception as e:
            logger.error(
                f"[CONTROL] Failed to subscribe to shadow updates for {shadow_name}: {e}"
            )

    def _on_keys_and_cert_accepted(self, response):
        """Handles CreateKeysAndCertificate success response."""
        logger.info("[CONTROL] Received CreateKeysAndCertificate response...")
        self.create_keys_response = response

    def _on_keys_and_cert_rejected(self, rejected):
        """Handles CreateKeysAndCertificate rejection."""
        logger.error(
            f"[CONTROL] CreateKeysAndCertificate rejected: {rejected.error_message}"
        )
        # TODO: Handle rejection

    def _on_register_thing_accepted(self, response):
        """Handles RegisterThing success response."""
        logger.info("[CONTROL] Received RegisterThing response...")
        self.register_thing_response = response

    def _on_register_thing_rejected(self, rejected):
        """Handles RegisterThing rejection."""
        logger.error(f"[CONTROL] RegisterThing rejected: {rejected.error_message}")
        # TODO: Handle rejection

    def _wait_for(self, attr, timeout: int, request_name: str):
        """Waits for an event to be set or times out."""
        count = 0
        while count < timeout and getattr(self, attr) is None:
            if getattr(self, attr) is not None:
                break
            else:
                logger.info(f"[CONTROL] Waiting for {request_name}...")
            count += 1
            sleep(1)

    def publish_named_shadow_update(self, thing_name, shadow_name, state):
        """Publishes an update to an AWS IoT Named Shadow using AWS IoT SDK."""

        logger.info(f"[CONTROL] Updating Shadow '{shadow_name}' for {thing_name}...")
        try:
            #mqtt_client = self._get_mqtt_client()
            shadow_client = iotshadow.IotShadowClient(mqtt_client)

            _ = shadow_client.publish_update_named_shadow(
                request=iotshadow.UpdateNamedShadowRequest(
                    thing_name=thing_name,
                    shadow_name=shadow_name,
                    state=iotshadow.ShadowState(reported=state),
                    # version=1
                ),
                qos=mqtt5.QoS.AT_LEAST_ONCE,
            )
            _.result()
            # _.add_done_callback(_puback)
        except Exception as e:
            logger.error(f"[CONTROL] Failed to update Shadow '{shadow_name}': {e}")

    def apply_shadow_update(self, shadow_name, delta_state):
        """Applies configuration updates received from shadow update/delta events."""
        try:
            logger.info(
                f"[CONTROL] Applying update from shadow '{shadow_name}': {delta_state}"
            )
            _state = None
            if shadow_name == "reader-config-policy":
                logger.info("[CONTROL] Updating Reader Configuration...")
                self.device_config["readerConfig"].update(delta_state)
                _state = self.device_config["readerConfig"]
            else:
                logger.warning(
                    f"[CONTROL] Unknown shadow update received: {shadow_name}"
                )
                return

            # Persist updated configuration
            serial_number = self.device_config.get("serialNumber")
            self.db_manager.save_device_config(serial_number, self.device_config)
            logger.info(f"[CONTROL] Updated configuration saved for {shadow_name}")

            # Acknowledge the update by reporting the new state back to AWS IoT
            self.publish_named_shadow_update(
                self.device_config["thingName"], shadow_name, _state
            )

        except Exception as e:
            logger.error(
                f"[CONTROL] Failed to apply shadow update for {shadow_name}: {e}"
            )

    def boot(self):
        logger.info("[CONTROL] Booting device...")
        # await asyncio.sleep(1)
        return {"status": "Boot complete"}

    def shutdown(self):
        logger.info("[CONTROL] Shutting down device...")
        # await asyncio.sleep(1)
        return {"status": "Shutdown complete"}

Here's the code for device agent which has some utility methods to manage mqtt connection

class DeviceAgent:
    def __init__(self):
        self.db_manager = DBManager()
        self.keystore = KeystoreManager()

        self.client_dir = os.getenv("CLIENT_DIR")
        self.client_config_path = os.path.join(self.client_dir, "config", "client.json")

        self.client_config = self.load_client_config()
        self.device_config = None
        self.mqtt_client = None
        self.identity_client = None

        self.iot_endpoint = self.client_config.get("iotDataEndpoint")
        self.debug_mode = self.client_config.get("debugMode", False)

        self.is_provisioned = False

        self._mqtt_connection_success = Future()
        self._mqtt_connection_stop = Future()

    def _get_mqtt_client(self, provisioning=False):
        """Returns an existing MQTT client or creates a new one if needed."""
        if not self.mqtt_client:
            self.mqtt_client = self._connect_to_mqtt(provisioning=provisioning)
        return self.mqtt_client

    def _connect_to_mqtt(self, provisioning=False):
        """
        Establishes a persistent MQTT 5 connection to AWS IoT.

        - Uses **claim certificates** during provisioning.
        - Uses **device certificates** after provisioning.
        - Handles automatic reconnections.
        """

        # Select the appropriate certificates
        if provisioning:
            cert_path, key_path = self.keystore.get_claim_certificate()
            client_id = uuid.uuid4().hex
        else:
            if not self.device_config:
                logger.error("[DeviceAgent] No device configuration found for MQTT.")
                return None
            cert_path = str(self.keystore.device_cert_file)
            key_path = str(self.keystore.device_key_file)
            client_id = self.device_config["thingName"]

        logger.info(f"Connecting with {cert_path} and {key_path}")
        if not cert_path or not key_path:
            logger.error("[DeviceAgent] Missing certificates. Cannot connect to MQTT.")
            return None

        logger.info(f"[DeviceAgent] Connecting to AWS IoT as '{client_id}'...")

        # Create MQTT 5 Client
        _mqtt_client = mqtt5_client_builder.mtls_from_path(
            endpoint=self.iot_endpoint,
            port=8883,
            cert_filepath=cert_path,
            pri_key_filepath=key_path,
            client_id=client_id,
            clean_session=False,
            # keep_alive_secs=30,
            on_lifecycle_stopped=self._on_mqtt_stopped,
            on_lifecycle_connection_success=self._on_mqtt_success,
            on_lifecycle_connection_failure=self._on_mqtt_failure,
        )

        try:
            _mqtt_client.start()

            connect_success_data = self._mqtt_connection_success.result()
            connack_packet = connect_success_data.connack_packet
            # negotiated_settings = connect_success_data.negotiated_settings
            logger.info(
                f"[DeviceAgent] MQTT Connection Established {repr(connack_packet.reason_code)}..."
            )
            return _mqtt_client
        except Exception as e:
            logger.exception(f"[DeviceAgent] MQTT Connection Error: {e}")
            return

    def _on_mqtt_stopped(self, event):
        """Handles MQTT disconnections."""
        logger.warning("[DeviceAgent] MQTT connection stopped...")
        self._mqtt_connection_stop.set_result(event)

    def _on_mqtt_success(self, event):
        """Handles MQTT successful connections and prevents double Future completion."""
        logger.info("[DeviceAgent] MQTT connection success.")

        if not self._mqtt_connection_success.done():
            self._mqtt_connection_success.set_result(event)
            # Mark connection as active and reset reconnect flag
            self.mqtt_connection_active = True
            self.mqtt_reconnect_required = False
        else:
            logger.warning(
                "[DeviceAgent] MQTT success event already set. Ignoring duplicate event."
            )

    def _on_mqtt_failure(self, event):
        """Handles MQTT reconnections."""
        logger.info(f"[DeviceAgent] MQTT connection exception: {event.exception}.")

    def _disconnect_mqtt(self):
        """Gracefully disconnects the existing MQTT connection."""
        if self.mqtt_client:
            try:
                logger.info("[DeviceAgent] Disconnecting MQTT Client...")
                self.mqtt_client.stop()
                self.mqtt_client = None
            except Exception as e:
                logger.error(f"[DeviceAgent] MQTT Disconnection Error: {e}")

I start this agent in an async mode

async def _start_control_agent():
    """Starts the REQ-REP server for Control Agent only."""
    print("🚀 Starting Control Agent REQ-REP Server...")

    control_agent = ControlAgent()

    # Handles REQ-REP requests
    rep_handlers = {
        "ipc:///tmp/controlAgent": control_agent.handle_request,
    }

    zmq_listener = ZeroMQManager()
    await zmq_listener.start_listening(rep_handlers, sub_handlers=None)


def start_control_agent():
    """Entry point for starting the Control Agent via Poetry script."""
    asyncio.run(_start_control_agent())

And here's the last piece, zeromq manager

class ZeroMQManager:
    def __init__(self):
        """Initialize ZeroMQ Context and Sockets."""
        self.context = zmq.asyncio.Context()
        self.req_sockets = {}
        self.sub_sockets = {}

    async def start_rep_server(self, endpoint: str, handler):
        """
        Starts a REQ-REP server (Used by Control & Jobs Agent).
        """
        socket = self.context.socket(zmq.REP)
        socket.bind(endpoint)
        logger.info(f"REQ-REP Server listening at {endpoint}")

        while True:
            message = await socket.recv_json()
            response = await handler(message)
            await socket.send_json(response)

    async def start_subscriber(self, endpoint: str, handler):
        """
        Subscribes to a PUB-SUB channel (Used by Data Agent).
        """
        socket = self.context.socket(zmq.SUB)
        socket.connect(endpoint)
        socket.setsockopt_string(zmq.SUBSCRIBE, "")

        self.sub_sockets[endpoint] = socket
        logger.info(f"Subscribed to {endpoint}")

        while True:
            msg = await socket.recv_json()
            await handler(msg)

    async def start_listening(self, rep_handlers=None, sub_handlers=None):
        """
        Start multiple REQ-REP and PUB-SUB listeners asynchronously.
        """
        tasks = []
        if rep_handlers:
            tasks += [
                self.start_rep_server(endpoint, handler)
                for endpoint, handler in rep_handlers.items()
            ]
        if sub_handlers:
            tasks += [
                self.start_subscriber(endpoint, handler)
                for endpoint, handler in sub_handlers.items()
            ]
        await asyncio.gather(*tasks)

Now when the agent receives the onboard request:

  1. Device is provisioned successfully.
  2. As per the logs, successfully subscribed to the shadow topics.
  3. Shadow update is sent successfully, I expect the event on previously subscribed shadow update events but that doesn't happen.
  4. If I restart the process, agent initializes the device (there's a method) and it subscribes to the shadow topics again but it doesn't receive anything when I send an update to the shadow from AWS MQTT client.

I have verified all the permissions and names in AWS and this process was working earlier when I had it in an async mode. I was running into some issues with that and I decided to rewrite it in conventional way.

Upvotes: 0

Views: 66

Answers (0)

Related Questions