Reputation: 41
After years of NodeJS dev, I decided to give Python a shot. So far so good, but I just ran into a wall that I would really like some help with.
I am working on a library that communicates with a remote machine using MQTT. When invoking a function on that library, a message is posted for processing on that remote machine. Once the processing is done, it posts a new message on the bus that my library picks up on and returns the result back to the calling code (the code that invoked the library function).
In Javascript, this is done by returning a Promise, that has a resolve
& reject
function, that can be stored within the library until the remote message comes back through the broker with the result (intercepted in a different function elsewhere in the library), at which point I can simply invoke the 'resolve' function stored previously to return control to the calling code (the code that invoked the async function of my library). This library function would simply be invoked using the async
keyword.
Now in Python, async/await does not use resolve
and reject
functions that can conveniently be stored away for later, so the logic must be implemented differently I suppose. Using a simple callback function rather than an async/await workflow works, but makes in inconvenient when invoked multiple times in sequence for similar back and forth communications, given that each result handling callback is a separate function.
Here is a basic example of what this would look like in Javascript (for illustration only):
let TASKS = {};
....
mqttClient.on('message', (topic, message) => {
if (topic == "RESULT_OK/123") {
TASKS["123"].resolve(message);
} else if (topic == "RESULT_KO/123") {
TASKS["123"].reject(message);
}
});
...
let myAsyncLibraryFunction = (someCommand) => {
return new Promise((res, rej) => {
TASKS["123"] = {
resolve: res,
reject: rej
};
mqttClient.publish("REQUEST/123", someCommand);
});
}
To call this, I would simply have to do:
try{
let response1 = await myAsyncLibraryFunction("do this");
let response2 = await myAsyncLibraryFunction("now do that");
...
} catch(e) {
...
}
NodeJS is an event loop based language, that's why this is very appropriate for those types of use cases. But this type of application logic is common when dealing with message-based disparate backends, so I am sure there are good ways of solving this in Python as well.
This is a test Python code snippet that I am working on, that attempts to use a future
object to achieve something similar:
import paho.mqtt.client as mqtt
import asyncio
import threading
# Init a new asyncio event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# define a global future placeholder
_future = None
# Create MQTT client
mqttClient = mqtt.Client()
# MQTT Event - on connect
def on_connect(client, userdata, flags, rc):
print("Connected")
client.subscribe("YAVA/#")
# We start a new thread to test our workflow.
#
# If I had done this on the current thread, then the MQTT event loop
# would get stuck (does not process incoming and outgoing messages anymore) when
# calling "await" on the future object later on.
taskThread = threading.Thread(target=_simulateClient, args=())
taskThread.start()
# MQTT Event - on incoming message
def on_message(client, userdata, msg):
global _future
if msg.topic.startswith("YAVA/API/TASK_DONE/") == True:
payload = str(msg.payload.decode("utf-8", "ignore"))
# Resolve the future object
_future.set_result(payload)
mqttClient.on_connect = on_connect
mqttClient.on_message = on_message
# Use asyncio to call a async function and test the workflow
def _simulateClient():
asyncio.run(performAsyncTask())
# This async function will ask for a task to be performed on a remote machine,
# and wait for the response to be sent back
async def performAsyncTask():
result = await pubAndWhaitForResponse("YAVA/API/TASK_START", "")
print(result)
# perform the actual MQTT async logic
async def pubAndWhaitForResponse(topic, message):
# Create a future object that can be resolved in the MQTT event "on_message"
global _future
_future = asyncio.get_running_loop().create_future()
# Publish message that will start the task execution remotely somewhere
global mqttClient
mqttClient.publish(topic, message)
# Now block the thread until the future get's resolved
result = await _future
# Return the result
return result
# Start the broker and loop^forever in the main thread
mqttClient.connect("192.168.1.70", 1883, 60)
# The MQTT library will start a new thread that will continuously
# process outgoing and incoming messages through that separate thread.
# The main thread will be blocked so that the program does not exit
mqttClient.loop_forever()
It all runs fine, but the _future.set_result(payload)
line does not seem to resolve the future. I never see the result printed.
It feels like there is not much missing to get this sorted. Any suggestions would be great.
Thanks
Upvotes: 1
Views: 4817
Reputation: 3109
I think we are using the asyncio
library the bad way, mixing it with multi-process/multi-threading parallelism.
Here is an implementation based on the multiprocessing
module. When submitting a task for your remote, your library can return a Queue
that the caller can use with the get()
method: it return the value if available, else it suspend the thread, waiting for the value. Hence, the Queue
acts as a Scala's Future or a JS Promise.
import multiprocessing
import time
from concurrent.futures.thread import ThreadPoolExecutor
import paho.mqtt.client as mqtt
import logging
logging.basicConfig(level=logging.DEBUG,
format="%(asctime)s ## %(thread)d ## %(funcName)s ## %(message)s")
def remote_on_connect(client, *_):
logging.info("Connected")
client.subscribe("YAVA/API/TASK_START")
def remote_on_message(client, _, _1):
logging.info("Remotely processing your data")
time.sleep(1)
logging.info("Publishing result")
client.publish("YAVA/API/TASK_DONE", 42)
class Lib:
def __init__(self):
self.client = mqtt.Client()
self.executor = ThreadPoolExecutor(max_workers=1)
self.client.on_connect = Lib.on_connect
self.client.connect("test.mosquitto.org")
self.client.loop_start()
def stop(self):
self.client.loop_stop()
def execute(self):
cb, queue = self.get_cb()
self.client.on_message = cb
self.client.publish("YAVA/API/TASK_START", "foo")
return queue
@staticmethod
def on_connect(client, *_):
logging.info("Connected")
client.subscribe("YAVA/API/TASK_DONE")
def get_cb(self):
queue = multiprocessing.Queue(maxsize=1)
def cb(_0, _1, msg):
self.client.on_message = None
logging.info("Fetching back the result")
logging.info(str(msg.payload.decode("utf-8", "ignore")))
queue.put(42)
logging.info("Queue filled")
return cb, queue
def main():
remote_client = mqtt.Client()
remote_client.on_connect = remote_on_connect
remote_client.on_message = remote_on_message
remote_client.connect("test.mosquitto.org")
remote_client.loop_start()
lib = Lib()
future = lib.execute()
logging.info("Result is:")
logging.info(future.get())
remote_client.loop_stop()
lib.stop()
logging.info("Exiting")
if __name__ == '__main__':
main()
2019-11-19 15:08:34,433 ## 139852611577600 ## remote_on_connect ## Connected
2019-11-19 15:08:34,450 ## 139852603184896 ## on_connect ## Connected
2019-11-19 15:08:34,452 ## 139852632065728 ## main ## Result is:
2019-11-19 15:08:34,467 ## 139852611577600 ## remote_on_message ## Remotely processing your data
2019-11-19 15:08:35,469 ## 139852611577600 ## remote_on_message ## Publishing result
2019-11-19 15:08:35,479 ## 139852603184896 ## cb ## Fetching back the result
2019-11-19 15:08:35,479 ## 139852603184896 ## cb ## 42
2019-11-19 15:08:35,480 ## 139852603184896 ## cb ## Queue filled
2019-11-19 15:08:35,480 ## 139852632065728 ## main ## 42
2019-11-19 15:08:36,481 ## 139852632065728 ## main ## Exiting
As you can see in the output, the main
method execute up to the future.get
method (as show by the Result is:
line early in the log). Then, processing happen in another thread, until putting a value inside the shared Queue
. Now the future.get
returns (because the value is available) and the main
method proceed to the end.
Hope this can help you to achieve what you want, but any insights about better ways to achieve this, either with asyncio
or with smaller data structure than Queue
, are welcome.
Upvotes: 1
Reputation: 3109
First thing I can see is that you publish in the YAVA/API/TASK_START
topic while checking that the topic is YAVA/API/TASK_DONE/
in your on_message
callback. Hence, your _future
never gets a result and the await _future
never returns...
I advice you to add log. Add these lines at the start of your code:
import logging
logging.basicConfig(level=logging.DEBUG,
format="%(asctime)s ## %(thread)d ## %(funcName)s ## %(message)s")
Then use logging.info(...)
to trace your execution order.
I added some to your code (as well as changing the condition in on_message
) and here is the output.
2019-11-19 11:37:10,440 ## 140178907485888 ## __init__ ## Using selector: EpollSelector
2019-11-19 11:37:10,478 ## 140178907485888 ## on_connect ## Connected
2019-11-19 11:37:10,478 ## 140178887976704 ## _simulateClient ## Enter simulate client
2019-11-19 11:37:10,479 ## 140178887976704 ## __init__ ## Using selector: EpollSelector
2019-11-19 11:37:10,480 ## 140178887976704 ## performAsyncTask ## Perform async task
2019-11-19 11:37:10,480 ## 140178887976704 ## pubAndWhaitForResponse ## Pub and wait
2019-11-19 11:37:10,481 ## 140178887976704 ## pubAndWhaitForResponse ## Publish
2019-11-19 11:37:10,481 ## 140178887976704 ## pubAndWhaitForResponse ## Await future: <Future pending created at /usr/lib/python3.7/asyncio/base_events.py:391>
2019-11-19 11:37:10,499 ## 140178907485888 ## on_message ## New message
2019-11-19 11:37:10,499 ## 140178907485888 ## on_message ## Topic: YAVA/API/TASK_DONE
2019-11-19 11:37:10,499 ## 140178907485888 ## on_message ## Filling future: <Future pending cb=[<TaskWakeupMethWrapper object at 0x7f7df0f5fd10>()] created at /usr/lib/python3.7/asyncio/base_events.py:391>
I also added a log after the _future.set_result(payload)
line, but it never appears. So the set_result
seems to hang or something like that...
You probably have to dig inside it to know why/where it hangs.
By the way, you are mixing many concepts: asyncio, threading, and mqtt (with its own loop).
Moreover, the asyncio.Future
is not thread-safe, I think it's dangerous to use it as you do. While using debugger, to go inside the set_result
method, I encounter an exception in the mqtt client class:
Non-thread-safe operation invoked on an event loop other than the current one
It is never reported on stdout/stderr, but you can maybe catch it in the on_log
callback of your client.
Here is a more Pythonic example of your code. In this one, the set_result
does not hang (the log just after is displayed) but it is the await
in the main
.
import asyncio
import time
import paho.mqtt.client as mqtt
import logging
logging.basicConfig(level=logging.DEBUG,
format="%(asctime)s ## %(thread)d ## %(funcName)s ## %(message)s")
def remote_on_connect(client, *_):
logging.info("Connected")
client.subscribe("YAVA/API/TASK_START")
def remote_on_message(client, _, _1):
logging.info("Remotely processing your data")
time.sleep(1)
logging.info("Publishing result")
client.publish("YAVA/API/TASK_DONE", 42)
class Lib:
def __init__(self):
self.client = mqtt.Client()
self.client.on_connect = Lib.on_connect
self.client.on_log = lambda x: logging.info("Log: %s", x)
self.client.connect("test.mosquitto.org")
self.client.loop_start()
def stop(self):
self.client.loop_stop()
def execute(self):
self.client.publish("YAVA/API/TASK_START", "foo")
cb, fut = Lib.get_cb()
self.client.on_message = cb
return fut
@staticmethod
def on_connect(client, *_):
logging.info("Connected")
client.subscribe("YAVA/API/TASK_DONE")
@staticmethod
def get_cb():
fut = asyncio.get_event_loop().create_future()
def cb(_0, _1, msg):
logging.info("Fetching back the result")
logging.info(str(msg.payload.decode("utf-8", "ignore")))
fut.set_result(42)
logging.info("Future updated")
return cb, fut
async def main():
remote_client = mqtt.Client()
remote_client.on_connect = remote_on_connect
remote_client.on_message = remote_on_message
remote_client.connect("test.mosquitto.org")
remote_client.loop_start()
lib = Lib()
future = lib.execute()
logging.info("Result is:")
await future
logging.info(future.result())
remote_client.loop_stop()
lib.stop()
logging.info("Exiting")
if __name__ == '__main__':
asyncio.run(main())
Upvotes: 0