Reputation: 1261
I seem to be having a difficult time understanding pythons asyncio. I have not written any code, as all the examples I see are for one-off runs. Create a few coroutine's, add them to an event loop, then run the loop, they run the tasks switching between them, done. Which does not seem all that helpful for me.
I want to use asyncio to not interrupt the operation in my application (using pyqt5). I want to create some functions that when called run in the asyncio event loop, then when they are done they do a callback.
What I imagine is. Create a separate thread for asyncio, create the loop and run it forever. Create some functions getFile(url, fp)
, get(url)
, readFile(file)
, etc. Then in the UI, I have a text box with a submit button, user enters url, clicks submit, it downloads the file.
But, every example I see, I cannot see how to add a coroutine to a running loop. And I do not see how I could do what I want without adding to a running loop.
#!/bin/python3
import asyncio
import aiohttp
import threading
loop = asyncio.get_event_loop()
def async_in_thread(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
async def _get(url, callback):
print("get: " + url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
result = await response.text()
callback(result)
return
def get(url, callback):
asyncio.ensure_future(_get(url, callback))
thread = threading.Thread(target=async_in_thread, args=(loop, ))
thread.start()
def stop():
loop.close()
def callme(data):
print(data)
stop()
get("http://google.com", callme)
thread.join()
This is what I imagine, but it does not work.
Upvotes: 2
Views: 3240
Reputation: 17386
Install QualMash to smooth integration between Qt and asyncio.
Example from the project's README gives an inspiration for how it looks like:
import sys
import asyncio
import time
from PyQt5.QtWidgets import QApplication, QProgressBar
from quamash import QEventLoop, QThreadExecutor
app = QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop) # NEW must set the event loop
progress = QProgressBar()
progress.setRange(0, 99)
progress.show()
async def master():
await first_50()
with QThreadExecutor(1) as exec:
await loop.run_in_executor(exec, last_50)
async def first_50():
for i in range(50):
progress.setValue(i)
await asyncio.sleep(.1)
def last_50():
for i in range(50,100):
loop.call_soon_threadsafe(progress.setValue, i)
time.sleep(.1)
with loop: ## context manager calls .close() when loop completes, and releases all resources
loop.run_until_complete(master())
Upvotes: 1
Reputation: 155475
To add a coroutine to a loop running in a different thread, use asyncio.run_coroutine_threadsafe
:
def get(url, callback):
asyncio.run_coroutine_threadsafe(_get(url, callback))
In general, when you are interacting with the event loop from outside the thread that runs it, you must run everything through either run_coroutine_threadsafe
(for coroutines) or loop.call_soon_threadsafe
(for functions). For example, to stop the loop, use loop.call_soon_threadsafe(loop.stop)
. Also note that loop.close()
must not be invoked inside a loop callback, so you should place that call in async_in_thread
, right after the call to run_forever()
, at which point the loop has definitely stopped running.
Another thing with asyncio is that passing explicit when_done
callbacks isn't idiomatic because asyncio exposes the concept of futures (akin to JavaScript promises), which allow attaching callbacks to a not-yet-available result. For example, one could write _get
like this:
async def _get(url):
print("get: " + url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
It doesn't need a callback
argument because any interested party can convert it to a task using loop.create_task
and use add_done_callback
to be notified when the task is complete. For example:
def _get_with_callback(url, callback):
loop = asyncio.get_event_loop()
task = loop.create_task(_get(url))
task.add_done_callback(lambda _fut: callback(task.result()))
In your case you're not dealing with the task directly because your code aims to communicate with the event loop from another thread. However, run_coroutine_threadsafe
returns a very useful value - a full-fledged concurrent.futures.Future
which you can use to register done callbacks. Instead of accepting a callback
argument, you can expose the future object to the caller:
def get(url):
return asyncio.run_coroutine_threadsafe(_get(url), loop)
Now the caller can choose a callback-based approach:
future = get(url)
# call me when done
future.add_done_callback(some_callback)
# ... proceed with other work ...
or, when appropriate, they can even wait for the result:
# give me the response, I'll wait for it
result = get(url).result()
The latter is by definition blocking, but since the event loop is safely running in a different thread, it is not affected by the blocking call.
Upvotes: 8