Elliott B
Elliott B

Reputation: 1199

How to make paho-mqtt loop timeout?

Using python paho-mqtt, I'm trying to receive exactly 1 message from a topic and then exit. If no messages are received in 5 seconds then I want to timeout and exit. The problem I have is that this code always waits 5 seconds, even if the message was received sooner. If I remove the sleep() line then it doesn't receive any message at all.

import paho.mqtt.client as mqtt
import logging
import time

def on_message(client, userdata, message):
    logging.info(f"Received message: {message.payload}")
    client.loop_stop()

logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s")

mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.enable_logger()
mqttc.on_message = on_message

mqttc.connect("mqtt.eclipseprojects.io")
mqttc.subscribe("#")
mqttc.loop_start()

time.sleep(5)

logging.info('Complete!')

Output:

2025-01-28 01:39:33,511 [DEBUG] Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b''
2025-01-28 01:39:33,511 [DEBUG] Sending SUBSCRIBE (d0, m1) [(b'#', 0)]
2025-01-28 01:39:33,632 [DEBUG] Received CONNACK (0, 0)
2025-01-28 01:39:33,634 [DEBUG] Received SUBACK
2025-01-28 01:39:33,634 [DEBUG] Received PUBLISH (d0, q0, r1, m0), 'AhoyDTU0600-Streusser/version', ...  (6 bytes)
2025-01-28 01:39:33,634 [INFO] Received message: b'0.8.83'
2025-01-28 01:39:38,517 [INFO] Complete!

Upvotes: 1

Views: 35

Answers (1)

mpivet-p
mpivet-p

Reputation: 244

In your code loop_start() is going to start a loop in a new thread, even if you close the connection with loop_stop() in the new thread, your main thread is still going to be executing the time.sleep(5).

A possible solution is to add a thread event that will be set once you receive a message and wait for the event or timeout after 5 seconds:

import paho.mqtt.client as mqtt
import logging
import threading

# Create an event to signal when a message is received
message_received_event = threading.Event()

def on_message(client, userdata, message):
    logging.info(f"Received message: {message.payload}")

    # Signal that a message has been received
    message_received_event.set()

    client.loop_stop()

logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s")

mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.enable_logger()
mqttc.on_message = on_message

mqttc.connect("mqtt.eclipseprojects.io")
mqttc.subscribe("#")
mqttc.loop_start()

# Wait for a message or timeout after 5 seconds
message_received_event.wait(timeout=5)

logging.info('Complete!') 

Upvotes: 1

Related Questions