IgorZ
IgorZ

Reputation: 1164

Python: asyncio loops with threads

Could you tell me if this is a correct approach to build several independent async loops inside own threads?

def init():
    print("Initializing Async...")
    global loop_heavy
    loop_heavy = asyncio.new_event_loop()
    start_loop(loop_heavy)

def start_loop(loop):
    thread = threading.Thread(target=loop.run_forever)
    thread.start()

def submit_heavy(task):
    future = asyncio.run_coroutine_threadsafe(task, loop_heavy)
    try:
        future.result()
    except Exception as e:
        print(e)

def stop():
    loop_heavy.call_soon_threadsafe(loop_heavy.stop)

async def heavy():
    print("3. heavy start %s" % threading.current_thread().name)
    await asyncio.sleep(3) # or await asyncio.sleep(3, loop=loop_heavy)
    print("4. heavy done")

Then I am testing it with:

if __name__ == "__main__":
    init()
    print("1. submit heavy: %s" % threading.current_thread().name)
    submit_heavy(heavy())
    print("2. submit is done")
    stop()

I am expecting to see 1->3->2->4 but in fact it is 1->3->4->2:

Initializing Async...
1. submit heavy: MainThread
3. heavy start Thread-1
4. heavy done
2. submit is done

I think that I miss something in understanding async and threads.
Threads are different. Why am I waiting inside MainThread until the job inside Thread-1 is finished?

Upvotes: 0

Views: 2813

Answers (2)

user4815162342
user4815162342

Reputation: 154906

Why am I waiting inside MainThread until the job inside Thread-1 is finished?

Good question, why are you?

One possible answer is, because you actually want to block the current thread until the job is finished. This is one of the reasons to put the event loop in another thread and use run_coroutine_threadsafe.

The other possible answer is that you don't have to if you don't want. You can simply return from submit_heavy() the concurrent.futures.Future object returned by run_coroutine_threadsafe, and leave it to the caller to wait for the result (or check if one is ready) at their own leisure.

Finally, if your goal is just to run a regular function "in the background" (without blocking the current thread), perhaps you don't need asyncio at all. Take a look at the concurrent.futures module, whose ThreadPoolExecutor allows you to easily submit a function to a thread pool and leave it to execute unassisted.

Upvotes: 3

IgorZ
IgorZ

Reputation: 1164

I will add one of the possible solutions that I found from the asyncio documentation.
I'm not sure that it is the correct way, but it works as expected (MainThread is not blocked by the execution of the child thread)

Running Blocking Code
Blocking (CPU-bound) code should not be called directly. For example, if a function performs a CPU-intensive calculation for 1 second, all concurrent asyncio Tasks and IO operations would be delayed by 1 second.
An executor can be used to run a task in a different thread or even in a different process to avoid blocking block the OS thread with the event loop. See the loop.run_in_executor() method for more details.

Applying to my code:

import asyncio
import threading
import concurrent.futures
import multiprocessing
import time

def init():
    print("Initializing Async...")

    global loop, thread_executor_pool

    thread_executor_pool = concurrent.futures.ThreadPoolExecutor(max_workers=multiprocessing.cpu_count())
    loop = asyncio.get_event_loop()

    thread = threading.Thread(target=loop.run_forever)
    thread.start()

def submit_task(task, *args):
    loop.run_in_executor(thread_executor_pool, task, *args)

def stop():
    loop.call_soon_threadsafe(loop.stop)
    thread_executor_pool.shutdown()

def blocked_task(msg1, msg2):
    print("3. task start msg: %s, %s, thread: %s" % (msg1, msg2, threading.current_thread().name))
    time.sleep(3)
    print("4. task is done -->")

if __name__ == "__main__":
    init()
    print("1. --> submit task: %s" % threading.current_thread().name)
    submit_task(blocked_task, "a", "b")

    print("2. --> submit is done")
    stop()

Output:

Initializing Async...

1. --> submit task: MainThread
3. task start msg: a, b, thread: ThreadPoolExecutor-0_0
2. --> submit is done
4. task is done  -->

Correct me if there are still any mistakes or it can be done in the other way.

Upvotes: -1

Related Questions