Reputation: 1199
Using python paho-mqtt, I'm trying to receive exactly 1 message from a topic and then exit. If no messages are received in 5 seconds then I want to timeout and exit. The problem I have is that this code always waits 5 seconds, even if the message was received sooner. If I remove the sleep()
line then it doesn't receive any message at all.
import paho.mqtt.client as mqtt
import logging
import time
def on_message(client, userdata, message):
logging.info(f"Received message: {message.payload}")
client.loop_stop()
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s")
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.enable_logger()
mqttc.on_message = on_message
mqttc.connect("mqtt.eclipseprojects.io")
mqttc.subscribe("#")
mqttc.loop_start()
time.sleep(5)
logging.info('Complete!')
Output:
2025-01-28 01:39:33,511 [DEBUG] Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b''
2025-01-28 01:39:33,511 [DEBUG] Sending SUBSCRIBE (d0, m1) [(b'#', 0)]
2025-01-28 01:39:33,632 [DEBUG] Received CONNACK (0, 0)
2025-01-28 01:39:33,634 [DEBUG] Received SUBACK
2025-01-28 01:39:33,634 [DEBUG] Received PUBLISH (d0, q0, r1, m0), 'AhoyDTU0600-Streusser/version', ... (6 bytes)
2025-01-28 01:39:33,634 [INFO] Received message: b'0.8.83'
2025-01-28 01:39:38,517 [INFO] Complete!
Upvotes: 1
Views: 35
Reputation: 244
In your code loop_start()
is going to start a loop in a new thread, even if you close the connection with loop_stop()
in the new thread, your main thread is still going to be executing the time.sleep(5)
.
A possible solution is to add a thread event that will be set once you receive a message and wait for the event or timeout after 5 seconds:
import paho.mqtt.client as mqtt
import logging
import threading
# Create an event to signal when a message is received
message_received_event = threading.Event()
def on_message(client, userdata, message):
logging.info(f"Received message: {message.payload}")
# Signal that a message has been received
message_received_event.set()
client.loop_stop()
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s")
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.enable_logger()
mqttc.on_message = on_message
mqttc.connect("mqtt.eclipseprojects.io")
mqttc.subscribe("#")
mqttc.loop_start()
# Wait for a message or timeout after 5 seconds
message_received_event.wait(timeout=5)
logging.info('Complete!')
Upvotes: 1