Reputation: 1107
I am trying to build a device client that is responsible for managing the IoT device. Idea is to have a control agent running all the time in a process which is responsible for provisioning the device and communicate with it's shadows for future updates and state management. Here's the code for my control agent. Another application running in the device sends request to this control agent via zeromq topic in req-rep pattern.
class ControlAgent(DeviceAgent):
def __init__(self):
super().__init__()
self.template_name = self.client_config.get("fleetProvisioningTemplateName")
self.sw_version = importlib.metadata.version("device_client")
self.create_keys_response = None
self.register_thing_response = None
self.thing_name = None
self.is_provisioned = False
self._initialize_device()
if self.is_provisioned:
self._get_mqtt_client()
self._subscribe_to_shadows()
def _initialize_device(self):
"""Loads device configuration on startup and subscribes to named shadows if provisioned."""
try:
# TODO: Need a way to get the serial number
device_config = self.db_manager.load_device_config(serial_number="ZN012345")
if device_config:
logger.info(
"[CONTROL] Device is already provisioned. Loading device configuration..."
)
self.device_config = device_config
self.thing_name = self.device_config["thingName"]
self.is_provisioned = True
except Exception as e:
logger.warning(
f"[CONTROL] No existing device configuration found. Device is not provisioned. {e}"
)
def _subscribe_to_shadows(self):
logger.info(f"[CONTROL] Subscribing to named shadows {self.mqtt_client}...")
tasks = [
self.subscribe_to_named_shadow_updates(
self.thing_name, "reader-config-policy"
)
]
asyncio.gather(*tasks)
async def handle_request(self, message):
"""Handles incoming REQ-REP messages."""
command = message.get("command")
logger.info(f"[CONTROL] Received command {command}.")
if command == "onboard":
return await self.onboard(message)
elif command == "boot":
return await self.boot()
elif command == "shutdown":
return await self.shutdown()
elif command == "config":
return await self.get_device_config(message)
else:
return {"error": "Unknown command"}
async def onboard(self, message):
"""
Starts AWS IoT Fleet Provisioning using the device's MAC address.
"""
serial_number = message.get("serialNumber")
config = await self._load_device_config(serial_number)
if config:
logger.info("[CONTROL] Device already provisioned.")
return {"error": "Device already provisioned."}
logger.info("[CONTROL] Starting IoT Fleet Provisioning...")
self.create_keys_response = None
self.register_thing_response = None
# TODO: validate request message.
# Create MQTT Connection
mqtt_client = self._get_mqtt_client(provisioning=True)
# Initialize AWS IoT Identity Client
identity_client = iotidentity.IotIdentityClient(mqtt_client)
# CreateKeysAndCertificate
await self._create_device_cert(identity_client)
await self._wait_for("create_keys_response", 10, "CreateKeysAndCertificate")
if not self.create_keys_response:
return {"error": "CreateKeysAndCertificate failed"}
# RegisterThing
await self._register_thing(identity_client, serial_number, message)
await self._wait_for("register_thing_response", 10, "RegisterThing")
if not self.register_thing_response:
return {"error": "RegisterThing failed"}
logger.info("[CONTROL] Storing device certificates.")
self.keystore.store_device_certificates(
cert_pem=self.create_keys_response.certificate_pem,
cert_key=self.create_keys_response.private_key,
)
logger.info("[CONTROL] Saving device configuration...")
thing_name = self.register_thing_response.thing_name
self.db_manager.save_device_config(
serial_number,
{...},
)
self.device_config = await self._load_device_config(serial_number)
self.is_provisioned = True
self.thing_name = thing_name
# Wait for configuration file to be saved
await asyncio.sleep(3)
self._restart_mqtt()
# Subscribe to named shadows
self._subscribe_to_shadows()
await asyncio.sleep(3)
# Update named shadows
await self.publish_named_shadow_update(
thing_name, "reader-config-policy", reader_config
)
logger.info(f"[CONTROL] Device onboarding complete {self.mqtt_client}...")
return {"status": "Onboarding completed."}
def subscribe_to_named_shadow_updates(self, thing_name, shadow_name):
"""Subscribes to the shadow update accepted, rejected, and delta topics using AWS IoT SDK."""
def on_accepted(response):
logger.info(f"[CONTROL] Shadow '{shadow_name}' update accepted...")
def on_rejected(error):
logger.error(f"[CONTROL] Shadow '{shadow_name}' update rejected...")
def on_delta(response):
"""Handles shadow delta updates and applies changes to the device."""
logger.info(f"[CONTROL] Shadow '{shadow_name}' delta received...")
delta_state = response.state
try:
self.apply_shadow_update(shadow_name, delta_state)
except RuntimeError:
logger.error(f"[CONTROL] Failed to apply delta for {shadow_name}...")
try:
# mqtt_client = self._get_mqtt_client()
shadow_client = iotshadow.IotShadowClient(self.mqtt_client)
# Subscribe to update accepted
accepted, _ = shadow_client.subscribe_to_update_named_shadow_accepted(
request=iotshadow.UpdateNamedShadowSubscriptionRequest(
thing_name=thing_name, shadow_name=shadow_name
),
qos=mqtt5.QoS.AT_LEAST_ONCE,
callback=on_accepted,
)
# Subscribe to update rejected
rejected, _ = shadow_client.subscribe_to_update_named_shadow_rejected(
request=iotshadow.UpdateNamedShadowSubscriptionRequest(
thing_name=thing_name, shadow_name=shadow_name
),
qos=mqtt5.QoS.AT_LEAST_ONCE,
callback=on_rejected,
)
# Subscribe to update/delta to detect changes
delta, _ = shadow_client.subscribe_to_named_shadow_delta_updated_events(
request=iotshadow.NamedShadowDeltaUpdatedSubscriptionRequest(
thing_name=thing_name, shadow_name=shadow_name
),
qos=mqtt5.QoS.AT_LEAST_ONCE,
callback=on_delta,
)
# Wait for subscriptions to succeed
accepted.result()
rejected.result()
delta.result()
logger.info(
f"[CONTROL] Successfully subscribed to {thing_name}/shadow/name/{shadow_name} shadows..."
)
except Exception as e:
logger.error(
f"[CONTROL] Failed to subscribe to shadow updates for {shadow_name}: {e}"
)
def _on_keys_and_cert_accepted(self, response):
"""Handles CreateKeysAndCertificate success response."""
logger.info("[CONTROL] Received CreateKeysAndCertificate response...")
self.create_keys_response = response
def _on_keys_and_cert_rejected(self, rejected):
"""Handles CreateKeysAndCertificate rejection."""
logger.error(
f"[CONTROL] CreateKeysAndCertificate rejected: {rejected.error_message}"
)
# TODO: Handle rejection
def _on_register_thing_accepted(self, response):
"""Handles RegisterThing success response."""
logger.info("[CONTROL] Received RegisterThing response...")
self.register_thing_response = response
def _on_register_thing_rejected(self, rejected):
"""Handles RegisterThing rejection."""
logger.error(f"[CONTROL] RegisterThing rejected: {rejected.error_message}")
# TODO: Handle rejection
def _wait_for(self, attr, timeout: int, request_name: str):
"""Waits for an event to be set or times out."""
count = 0
while count < timeout and getattr(self, attr) is None:
if getattr(self, attr) is not None:
break
else:
logger.info(f"[CONTROL] Waiting for {request_name}...")
count += 1
sleep(1)
def publish_named_shadow_update(self, thing_name, shadow_name, state):
"""Publishes an update to an AWS IoT Named Shadow using AWS IoT SDK."""
logger.info(f"[CONTROL] Updating Shadow '{shadow_name}' for {thing_name}...")
try:
#mqtt_client = self._get_mqtt_client()
shadow_client = iotshadow.IotShadowClient(mqtt_client)
_ = shadow_client.publish_update_named_shadow(
request=iotshadow.UpdateNamedShadowRequest(
thing_name=thing_name,
shadow_name=shadow_name,
state=iotshadow.ShadowState(reported=state),
# version=1
),
qos=mqtt5.QoS.AT_LEAST_ONCE,
)
_.result()
# _.add_done_callback(_puback)
except Exception as e:
logger.error(f"[CONTROL] Failed to update Shadow '{shadow_name}': {e}")
def apply_shadow_update(self, shadow_name, delta_state):
"""Applies configuration updates received from shadow update/delta events."""
try:
logger.info(
f"[CONTROL] Applying update from shadow '{shadow_name}': {delta_state}"
)
_state = None
if shadow_name == "reader-config-policy":
logger.info("[CONTROL] Updating Reader Configuration...")
self.device_config["readerConfig"].update(delta_state)
_state = self.device_config["readerConfig"]
else:
logger.warning(
f"[CONTROL] Unknown shadow update received: {shadow_name}"
)
return
# Persist updated configuration
serial_number = self.device_config.get("serialNumber")
self.db_manager.save_device_config(serial_number, self.device_config)
logger.info(f"[CONTROL] Updated configuration saved for {shadow_name}")
# Acknowledge the update by reporting the new state back to AWS IoT
self.publish_named_shadow_update(
self.device_config["thingName"], shadow_name, _state
)
except Exception as e:
logger.error(
f"[CONTROL] Failed to apply shadow update for {shadow_name}: {e}"
)
def boot(self):
logger.info("[CONTROL] Booting device...")
# await asyncio.sleep(1)
return {"status": "Boot complete"}
def shutdown(self):
logger.info("[CONTROL] Shutting down device...")
# await asyncio.sleep(1)
return {"status": "Shutdown complete"}
Here's the code for device agent which has some utility methods to manage mqtt connection
class DeviceAgent:
def __init__(self):
self.db_manager = DBManager()
self.keystore = KeystoreManager()
self.client_dir = os.getenv("CLIENT_DIR")
self.client_config_path = os.path.join(self.client_dir, "config", "client.json")
self.client_config = self.load_client_config()
self.device_config = None
self.mqtt_client = None
self.identity_client = None
self.iot_endpoint = self.client_config.get("iotDataEndpoint")
self.debug_mode = self.client_config.get("debugMode", False)
self.is_provisioned = False
self._mqtt_connection_success = Future()
self._mqtt_connection_stop = Future()
def _get_mqtt_client(self, provisioning=False):
"""Returns an existing MQTT client or creates a new one if needed."""
if not self.mqtt_client:
self.mqtt_client = self._connect_to_mqtt(provisioning=provisioning)
return self.mqtt_client
def _connect_to_mqtt(self, provisioning=False):
"""
Establishes a persistent MQTT 5 connection to AWS IoT.
- Uses **claim certificates** during provisioning.
- Uses **device certificates** after provisioning.
- Handles automatic reconnections.
"""
# Select the appropriate certificates
if provisioning:
cert_path, key_path = self.keystore.get_claim_certificate()
client_id = uuid.uuid4().hex
else:
if not self.device_config:
logger.error("[DeviceAgent] No device configuration found for MQTT.")
return None
cert_path = str(self.keystore.device_cert_file)
key_path = str(self.keystore.device_key_file)
client_id = self.device_config["thingName"]
logger.info(f"Connecting with {cert_path} and {key_path}")
if not cert_path or not key_path:
logger.error("[DeviceAgent] Missing certificates. Cannot connect to MQTT.")
return None
logger.info(f"[DeviceAgent] Connecting to AWS IoT as '{client_id}'...")
# Create MQTT 5 Client
_mqtt_client = mqtt5_client_builder.mtls_from_path(
endpoint=self.iot_endpoint,
port=8883,
cert_filepath=cert_path,
pri_key_filepath=key_path,
client_id=client_id,
clean_session=False,
# keep_alive_secs=30,
on_lifecycle_stopped=self._on_mqtt_stopped,
on_lifecycle_connection_success=self._on_mqtt_success,
on_lifecycle_connection_failure=self._on_mqtt_failure,
)
try:
_mqtt_client.start()
connect_success_data = self._mqtt_connection_success.result()
connack_packet = connect_success_data.connack_packet
# negotiated_settings = connect_success_data.negotiated_settings
logger.info(
f"[DeviceAgent] MQTT Connection Established {repr(connack_packet.reason_code)}..."
)
return _mqtt_client
except Exception as e:
logger.exception(f"[DeviceAgent] MQTT Connection Error: {e}")
return
def _on_mqtt_stopped(self, event):
"""Handles MQTT disconnections."""
logger.warning("[DeviceAgent] MQTT connection stopped...")
self._mqtt_connection_stop.set_result(event)
def _on_mqtt_success(self, event):
"""Handles MQTT successful connections and prevents double Future completion."""
logger.info("[DeviceAgent] MQTT connection success.")
if not self._mqtt_connection_success.done():
self._mqtt_connection_success.set_result(event)
# Mark connection as active and reset reconnect flag
self.mqtt_connection_active = True
self.mqtt_reconnect_required = False
else:
logger.warning(
"[DeviceAgent] MQTT success event already set. Ignoring duplicate event."
)
def _on_mqtt_failure(self, event):
"""Handles MQTT reconnections."""
logger.info(f"[DeviceAgent] MQTT connection exception: {event.exception}.")
def _disconnect_mqtt(self):
"""Gracefully disconnects the existing MQTT connection."""
if self.mqtt_client:
try:
logger.info("[DeviceAgent] Disconnecting MQTT Client...")
self.mqtt_client.stop()
self.mqtt_client = None
except Exception as e:
logger.error(f"[DeviceAgent] MQTT Disconnection Error: {e}")
I start this agent in an async mode
async def _start_control_agent():
"""Starts the REQ-REP server for Control Agent only."""
print("🚀 Starting Control Agent REQ-REP Server...")
control_agent = ControlAgent()
# Handles REQ-REP requests
rep_handlers = {
"ipc:///tmp/controlAgent": control_agent.handle_request,
}
zmq_listener = ZeroMQManager()
await zmq_listener.start_listening(rep_handlers, sub_handlers=None)
def start_control_agent():
"""Entry point for starting the Control Agent via Poetry script."""
asyncio.run(_start_control_agent())
And here's the last piece, zeromq manager
class ZeroMQManager:
def __init__(self):
"""Initialize ZeroMQ Context and Sockets."""
self.context = zmq.asyncio.Context()
self.req_sockets = {}
self.sub_sockets = {}
async def start_rep_server(self, endpoint: str, handler):
"""
Starts a REQ-REP server (Used by Control & Jobs Agent).
"""
socket = self.context.socket(zmq.REP)
socket.bind(endpoint)
logger.info(f"REQ-REP Server listening at {endpoint}")
while True:
message = await socket.recv_json()
response = await handler(message)
await socket.send_json(response)
async def start_subscriber(self, endpoint: str, handler):
"""
Subscribes to a PUB-SUB channel (Used by Data Agent).
"""
socket = self.context.socket(zmq.SUB)
socket.connect(endpoint)
socket.setsockopt_string(zmq.SUBSCRIBE, "")
self.sub_sockets[endpoint] = socket
logger.info(f"Subscribed to {endpoint}")
while True:
msg = await socket.recv_json()
await handler(msg)
async def start_listening(self, rep_handlers=None, sub_handlers=None):
"""
Start multiple REQ-REP and PUB-SUB listeners asynchronously.
"""
tasks = []
if rep_handlers:
tasks += [
self.start_rep_server(endpoint, handler)
for endpoint, handler in rep_handlers.items()
]
if sub_handlers:
tasks += [
self.start_subscriber(endpoint, handler)
for endpoint, handler in sub_handlers.items()
]
await asyncio.gather(*tasks)
Now when the agent receives the onboard request:
I have verified all the permissions and names in AWS and this process was working earlier when I had it in an async mode. I was running into some issues with that and I decided to rewrite it in conventional way.
Upvotes: 0
Views: 66