Drew
Drew

Reputation: 1261

Python asyncio (aiohttp, aiofiles)

I seem to be having a difficult time understanding pythons asyncio. I have not written any code, as all the examples I see are for one-off runs. Create a few coroutine's, add them to an event loop, then run the loop, they run the tasks switching between them, done. Which does not seem all that helpful for me.

I want to use asyncio to not interrupt the operation in my application (using pyqt5). I want to create some functions that when called run in the asyncio event loop, then when they are done they do a callback.

What I imagine is. Create a separate thread for asyncio, create the loop and run it forever. Create some functions getFile(url, fp), get(url), readFile(file), etc. Then in the UI, I have a text box with a submit button, user enters url, clicks submit, it downloads the file.

But, every example I see, I cannot see how to add a coroutine to a running loop. And I do not see how I could do what I want without adding to a running loop.

#!/bin/python3
import asyncio
import aiohttp
import threading

loop = asyncio.get_event_loop()

def async_in_thread(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def _get(url, callback):
    print("get: " + url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            result = await response.text()
            callback(result)
            return

def get(url, callback):
    asyncio.ensure_future(_get(url, callback))

thread = threading.Thread(target=async_in_thread, args=(loop, ))

thread.start()

def stop():
    loop.close()

def callme(data):
    print(data)
    stop()

get("http://google.com", callme)

thread.join()

This is what I imagine, but it does not work.

Upvotes: 2

Views: 3240

Answers (2)

Andrew Svetlov
Andrew Svetlov

Reputation: 17386

Install QualMash to smooth integration between Qt and asyncio.

Example from the project's README gives an inspiration for how it looks like:

import sys
import asyncio
import time

from PyQt5.QtWidgets import QApplication, QProgressBar
from quamash import QEventLoop, QThreadExecutor

app = QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop)  # NEW must set the event loop

progress = QProgressBar()
progress.setRange(0, 99)
progress.show()

async def master():
    await first_50()
    with QThreadExecutor(1) as exec:
        await loop.run_in_executor(exec, last_50)

async def first_50():
    for i in range(50):
        progress.setValue(i)
        await asyncio.sleep(.1)

def last_50():
    for i in range(50,100):
        loop.call_soon_threadsafe(progress.setValue, i)
        time.sleep(.1)

with loop: ## context manager calls .close() when loop completes, and releases all resources
    loop.run_until_complete(master())

Upvotes: 1

user4815162342
user4815162342

Reputation: 155475

To add a coroutine to a loop running in a different thread, use asyncio.run_coroutine_threadsafe:

def get(url, callback):
    asyncio.run_coroutine_threadsafe(_get(url, callback))

In general, when you are interacting with the event loop from outside the thread that runs it, you must run everything through either run_coroutine_threadsafe (for coroutines) or loop.call_soon_threadsafe (for functions). For example, to stop the loop, use loop.call_soon_threadsafe(loop.stop). Also note that loop.close() must not be invoked inside a loop callback, so you should place that call in async_in_thread, right after the call to run_forever(), at which point the loop has definitely stopped running.

Another thing with asyncio is that passing explicit when_done callbacks isn't idiomatic because asyncio exposes the concept of futures (akin to JavaScript promises), which allow attaching callbacks to a not-yet-available result. For example, one could write _get like this:

async def _get(url):
    print("get: " + url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

It doesn't need a callback argument because any interested party can convert it to a task using loop.create_task and use add_done_callback to be notified when the task is complete. For example:

def _get_with_callback(url, callback):
    loop = asyncio.get_event_loop()
    task = loop.create_task(_get(url))
    task.add_done_callback(lambda _fut: callback(task.result()))

In your case you're not dealing with the task directly because your code aims to communicate with the event loop from another thread. However, run_coroutine_threadsafe returns a very useful value - a full-fledged concurrent.futures.Future which you can use to register done callbacks. Instead of accepting a callback argument, you can expose the future object to the caller:

def get(url):
    return asyncio.run_coroutine_threadsafe(_get(url), loop)

Now the caller can choose a callback-based approach:

future = get(url)
# call me when done
future.add_done_callback(some_callback)
# ... proceed with other work ...

or, when appropriate, they can even wait for the result:

# give me the response, I'll wait for it
result = get(url).result()

The latter is by definition blocking, but since the event loop is safely running in a different thread, it is not affected by the blocking call.

Upvotes: 8

Related Questions