Reputation: 1164
Could you tell me if this is a correct approach to build several independent async loops inside own threads?
def init():
print("Initializing Async...")
global loop_heavy
loop_heavy = asyncio.new_event_loop()
start_loop(loop_heavy)
def start_loop(loop):
thread = threading.Thread(target=loop.run_forever)
thread.start()
def submit_heavy(task):
future = asyncio.run_coroutine_threadsafe(task, loop_heavy)
try:
future.result()
except Exception as e:
print(e)
def stop():
loop_heavy.call_soon_threadsafe(loop_heavy.stop)
async def heavy():
print("3. heavy start %s" % threading.current_thread().name)
await asyncio.sleep(3) # or await asyncio.sleep(3, loop=loop_heavy)
print("4. heavy done")
Then I am testing it with:
if __name__ == "__main__":
init()
print("1. submit heavy: %s" % threading.current_thread().name)
submit_heavy(heavy())
print("2. submit is done")
stop()
I am expecting to see 1->3->2->4
but in fact it is 1->3->4->2
:
Initializing Async...
1. submit heavy: MainThread
3. heavy start Thread-1
4. heavy done
2. submit is done
I think that I miss something in understanding async and threads.
Threads are different. Why am I waiting inside MainThread
until the job inside Thread-1
is finished?
Upvotes: 0
Views: 2813
Reputation: 154906
Why am I waiting inside
MainThread
until the job insideThread-1
is finished?
Good question, why are you?
One possible answer is, because you actually want to block the current thread until the job is finished. This is one of the reasons to put the event loop in another thread and use run_coroutine_threadsafe
.
The other possible answer is that you don't have to if you don't want. You can simply return from submit_heavy()
the concurrent.futures.Future
object returned by run_coroutine_threadsafe
, and leave it to the caller to wait for the result (or check if one is ready) at their own leisure.
Finally, if your goal is just to run a regular function "in the background" (without blocking the current thread), perhaps you don't need asyncio at all. Take a look at the concurrent.futures
module, whose ThreadPoolExecutor
allows you to easily submit a function to a thread pool and leave it to execute unassisted.
Upvotes: 3
Reputation: 1164
I will add one of the possible solutions that I found from the asyncio documentation.
I'm not sure that it is the correct way, but it works as expected (MainThread
is not blocked by the execution of the child thread)
Running Blocking Code
Blocking (CPU-bound) code should not be called directly. For example, if a function performs a CPU-intensive calculation for 1 second, all concurrent asyncio Tasks and IO operations would be delayed by 1 second.
An executor can be used to run a task in a different thread or even in a different process to avoid blocking block the OS thread with the event loop. See the loop.run_in_executor() method for more details.
Applying to my code:
import asyncio
import threading
import concurrent.futures
import multiprocessing
import time
def init():
print("Initializing Async...")
global loop, thread_executor_pool
thread_executor_pool = concurrent.futures.ThreadPoolExecutor(max_workers=multiprocessing.cpu_count())
loop = asyncio.get_event_loop()
thread = threading.Thread(target=loop.run_forever)
thread.start()
def submit_task(task, *args):
loop.run_in_executor(thread_executor_pool, task, *args)
def stop():
loop.call_soon_threadsafe(loop.stop)
thread_executor_pool.shutdown()
def blocked_task(msg1, msg2):
print("3. task start msg: %s, %s, thread: %s" % (msg1, msg2, threading.current_thread().name))
time.sleep(3)
print("4. task is done -->")
if __name__ == "__main__":
init()
print("1. --> submit task: %s" % threading.current_thread().name)
submit_task(blocked_task, "a", "b")
print("2. --> submit is done")
stop()
Output:
Initializing Async...
1. --> submit task: MainThread
3. task start msg: a, b, thread: ThreadPoolExecutor-0_0
2. --> submit is done
4. task is done -->
Correct me if there are still any mistakes or it can be done in the other way.
Upvotes: -1