user3742205
user3742205

Reputation: 41

How to use Async / Await in Python with messaging based invocation

After years of NodeJS dev, I decided to give Python a shot. So far so good, but I just ran into a wall that I would really like some help with.

I am working on a library that communicates with a remote machine using MQTT. When invoking a function on that library, a message is posted for processing on that remote machine. Once the processing is done, it posts a new message on the bus that my library picks up on and returns the result back to the calling code (the code that invoked the library function).

In Javascript, this is done by returning a Promise, that has a resolve & reject function, that can be stored within the library until the remote message comes back through the broker with the result (intercepted in a different function elsewhere in the library), at which point I can simply invoke the 'resolve' function stored previously to return control to the calling code (the code that invoked the async function of my library). This library function would simply be invoked using the async keyword.

Now in Python, async/await does not use resolve and reject functions that can conveniently be stored away for later, so the logic must be implemented differently I suppose. Using a simple callback function rather than an async/await workflow works, but makes in inconvenient when invoked multiple times in sequence for similar back and forth communications, given that each result handling callback is a separate function.

Here is a basic example of what this would look like in Javascript (for illustration only):

let TASKS = {};

....

mqttClient.on('message', (topic, message) => {
    if (topic == "RESULT_OK/123") {
        TASKS["123"].resolve(message);
    } else if (topic == "RESULT_KO/123") {
        TASKS["123"].reject(message);
    }
});

...

let myAsyncLibraryFunction = (someCommand) => {
    return new Promise((res, rej) => {
        TASKS["123"] = {
            resolve: res,
            reject: rej
        };
        mqttClient.publish("REQUEST/123", someCommand);
    });
}

To call this, I would simply have to do:

try{
    let response1 = await myAsyncLibraryFunction("do this");
    let response2 = await myAsyncLibraryFunction("now do that");
    ...
} catch(e) {
    ...
}

NodeJS is an event loop based language, that's why this is very appropriate for those types of use cases. But this type of application logic is common when dealing with message-based disparate backends, so I am sure there are good ways of solving this in Python as well.

This is a test Python code snippet that I am working on, that attempts to use a future object to achieve something similar:

import paho.mqtt.client as mqtt
import asyncio
import threading

# Init a new asyncio event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# define a global future placeholder
_future = None

# Create MQTT client
mqttClient = mqtt.Client()

# MQTT Event - on connect
def on_connect(client, userdata, flags, rc):
    print("Connected")
    client.subscribe("YAVA/#")
    
    # We start a new thread to test our workflow.
    #
    # If I had done this on the current thread, then the MQTT event loop 
    # would get stuck (does not process incoming and outgoing messages anymore) when 
    # calling "await" on the future object later on.
    taskThread = threading.Thread(target=_simulateClient, args=())
    taskThread.start()

# MQTT Event - on incoming message
def on_message(client, userdata, msg):
    
    global _future
    if msg.topic.startswith("YAVA/API/TASK_DONE/") == True:
        payload = str(msg.payload.decode("utf-8", "ignore"))
        # Resolve the future object
        _future.set_result(payload)

mqttClient.on_connect = on_connect
mqttClient.on_message = on_message




# Use asyncio to call a async function and test the workflow
def _simulateClient():
    asyncio.run(performAsyncTask())

# This async function will ask for a task to be performed on a remote machine, 
# and wait for the response to be sent back
async def performAsyncTask():
    result = await pubAndWhaitForResponse("YAVA/API/TASK_START", "")
    print(result)
    
# perform the actual MQTT async logic
async def pubAndWhaitForResponse(topic, message):
    # Create a future object that can be resolved in the MQTT event "on_message"
    global _future
    _future = asyncio.get_running_loop().create_future()

    # Publish message that will start the task execution remotely somewhere
    global mqttClient
    mqttClient.publish(topic, message)

    # Now block the thread until the future get's resolved
    result = await _future

    # Return the result
    return result


# Start the broker and loop^forever in the main thread
mqttClient.connect("192.168.1.70", 1883, 60)
# The MQTT library will start a new thread that will continuously 
# process outgoing and incoming messages through that separate thread.
# The main thread will be blocked so that the program does not exit
mqttClient.loop_forever()

It all runs fine, but the _future.set_result(payload) line does not seem to resolve the future. I never see the result printed.

It feels like there is not much missing to get this sorted. Any suggestions would be great.

Thanks

Upvotes: 1

Views: 4817

Answers (2)

AlexisBRENON
AlexisBRENON

Reputation: 3109

I think we are using the asyncio library the bad way, mixing it with multi-process/multi-threading parallelism.

Here is an implementation based on the multiprocessing module. When submitting a task for your remote, your library can return a Queue that the caller can use with the get() method: it return the value if available, else it suspend the thread, waiting for the value. Hence, the Queue acts as a Scala's Future or a JS Promise.

import multiprocessing
import time
from concurrent.futures.thread import ThreadPoolExecutor

import paho.mqtt.client as mqtt
import logging

logging.basicConfig(level=logging.DEBUG,
                    format="%(asctime)s ## %(thread)d ## %(funcName)s ## %(message)s")


def remote_on_connect(client, *_):
    logging.info("Connected")
    client.subscribe("YAVA/API/TASK_START")

def remote_on_message(client, _, _1):
    logging.info("Remotely processing your data")
    time.sleep(1)
    logging.info("Publishing result")
    client.publish("YAVA/API/TASK_DONE", 42)


class Lib:
    def __init__(self):
        self.client = mqtt.Client()
        self.executor = ThreadPoolExecutor(max_workers=1)

        self.client.on_connect = Lib.on_connect
        self.client.connect("test.mosquitto.org")
        self.client.loop_start()

    def stop(self):
        self.client.loop_stop()

    def execute(self):
        cb, queue = self.get_cb()
        self.client.on_message = cb
        self.client.publish("YAVA/API/TASK_START", "foo")
        return queue

    @staticmethod
    def on_connect(client, *_):
        logging.info("Connected")
        client.subscribe("YAVA/API/TASK_DONE")

    def get_cb(self):
        queue = multiprocessing.Queue(maxsize=1)
        def cb(_0, _1, msg):
            self.client.on_message = None
            logging.info("Fetching back the result")
            logging.info(str(msg.payload.decode("utf-8", "ignore")))
            queue.put(42)
            logging.info("Queue filled")
        return cb, queue


def main():
    remote_client = mqtt.Client()
    remote_client.on_connect = remote_on_connect
    remote_client.on_message = remote_on_message
    remote_client.connect("test.mosquitto.org")
    remote_client.loop_start()

    lib = Lib()
    future = lib.execute()

    logging.info("Result is:")
    logging.info(future.get())

    remote_client.loop_stop()
    lib.stop()

    logging.info("Exiting")


if __name__ == '__main__':
    main()
2019-11-19 15:08:34,433 ## 139852611577600 ## remote_on_connect ## Connected
2019-11-19 15:08:34,450 ## 139852603184896 ## on_connect ## Connected
2019-11-19 15:08:34,452 ## 139852632065728 ## main ## Result is:
2019-11-19 15:08:34,467 ## 139852611577600 ## remote_on_message ## Remotely processing your data
2019-11-19 15:08:35,469 ## 139852611577600 ## remote_on_message ## Publishing result
2019-11-19 15:08:35,479 ## 139852603184896 ## cb ## Fetching back the result
2019-11-19 15:08:35,479 ## 139852603184896 ## cb ## 42
2019-11-19 15:08:35,480 ## 139852603184896 ## cb ## Queue filled
2019-11-19 15:08:35,480 ## 139852632065728 ## main ## 42
2019-11-19 15:08:36,481 ## 139852632065728 ## main ## Exiting

As you can see in the output, the main method execute up to the future.get method (as show by the Result is: line early in the log). Then, processing happen in another thread, until putting a value inside the shared Queue. Now the future.get returns (because the value is available) and the main method proceed to the end.

Hope this can help you to achieve what you want, but any insights about better ways to achieve this, either with asyncio or with smaller data structure than Queue, are welcome.

Upvotes: 1

AlexisBRENON
AlexisBRENON

Reputation: 3109

First thing I can see is that you publish in the YAVA/API/TASK_START topic while checking that the topic is YAVA/API/TASK_DONE/ in your on_message callback. Hence, your _future never gets a result and the await _future never returns...

I advice you to add log. Add these lines at the start of your code:

import logging

logging.basicConfig(level=logging.DEBUG,
                    format="%(asctime)s ## %(thread)d ## %(funcName)s ## %(message)s")

Then use logging.info(...) to trace your execution order. I added some to your code (as well as changing the condition in on_message) and here is the output.

2019-11-19 11:37:10,440 ## 140178907485888 ## __init__ ## Using selector: EpollSelector
2019-11-19 11:37:10,478 ## 140178907485888 ## on_connect ## Connected
2019-11-19 11:37:10,478 ## 140178887976704 ## _simulateClient ## Enter simulate client
2019-11-19 11:37:10,479 ## 140178887976704 ## __init__ ## Using selector: EpollSelector
2019-11-19 11:37:10,480 ## 140178887976704 ## performAsyncTask ## Perform async task
2019-11-19 11:37:10,480 ## 140178887976704 ## pubAndWhaitForResponse ## Pub and wait
2019-11-19 11:37:10,481 ## 140178887976704 ## pubAndWhaitForResponse ## Publish
2019-11-19 11:37:10,481 ## 140178887976704 ## pubAndWhaitForResponse ## Await future: <Future pending created at /usr/lib/python3.7/asyncio/base_events.py:391>
2019-11-19 11:37:10,499 ## 140178907485888 ## on_message ## New message
2019-11-19 11:37:10,499 ## 140178907485888 ## on_message ## Topic: YAVA/API/TASK_DONE
2019-11-19 11:37:10,499 ## 140178907485888 ## on_message ## Filling future: <Future pending cb=[<TaskWakeupMethWrapper object at 0x7f7df0f5fd10>()] created at /usr/lib/python3.7/asyncio/base_events.py:391>

I also added a log after the _future.set_result(payload) line, but it never appears. So the set_result seems to hang or something like that...

You probably have to dig inside it to know why/where it hangs.


Edit

By the way, you are mixing many concepts: asyncio, threading, and mqtt (with its own loop). Moreover, the asyncio.Future is not thread-safe, I think it's dangerous to use it as you do. While using debugger, to go inside the set_result method, I encounter an exception in the mqtt client class:

Non-thread-safe operation invoked on an event loop other than the current one

It is never reported on stdout/stderr, but you can maybe catch it in the on_log callback of your client.


Edit 2

Here is a more Pythonic example of your code. In this one, the set_result does not hang (the log just after is displayed) but it is the await in the main.

import asyncio
import time

import paho.mqtt.client as mqtt
import logging

logging.basicConfig(level=logging.DEBUG,
                    format="%(asctime)s ## %(thread)d ## %(funcName)s ## %(message)s")


def remote_on_connect(client, *_):
    logging.info("Connected")
    client.subscribe("YAVA/API/TASK_START")

def remote_on_message(client, _, _1):
    logging.info("Remotely processing your data")
    time.sleep(1)
    logging.info("Publishing result")
    client.publish("YAVA/API/TASK_DONE", 42)


class Lib:
    def __init__(self):
        self.client = mqtt.Client()
        self.client.on_connect = Lib.on_connect
        self.client.on_log = lambda x: logging.info("Log: %s", x)
        self.client.connect("test.mosquitto.org")
        self.client.loop_start()

    def stop(self):
        self.client.loop_stop()

    def execute(self):
        self.client.publish("YAVA/API/TASK_START", "foo")

        cb, fut = Lib.get_cb()
        self.client.on_message = cb
        return fut

    @staticmethod
    def on_connect(client, *_):
        logging.info("Connected")
        client.subscribe("YAVA/API/TASK_DONE")

    @staticmethod
    def get_cb():
        fut = asyncio.get_event_loop().create_future()
        def cb(_0, _1, msg):
            logging.info("Fetching back the result")
            logging.info(str(msg.payload.decode("utf-8", "ignore")))
            fut.set_result(42)
            logging.info("Future updated")
        return cb, fut


async def main():
    remote_client = mqtt.Client()
    remote_client.on_connect = remote_on_connect
    remote_client.on_message = remote_on_message
    remote_client.connect("test.mosquitto.org")
    remote_client.loop_start()

    lib = Lib()
    future = lib.execute()

    logging.info("Result is:")
    await future
    logging.info(future.result())

    remote_client.loop_stop()
    lib.stop()

    logging.info("Exiting")


if __name__ == '__main__':
    asyncio.run(main())

Upvotes: 0

Related Questions