brunoop
brunoop

Reputation: 1029

Asyncio two loops for different I/O tasks?

I am using Python3 Asyncio module to create a load balancing application. I have two heavy IO tasks:

Both processes are going to run forever, are independent from eachother and should not be blocked by the other one.

I cant use 1 event loop because they would block eachother, is there any way to have 2 event loops or do I have to use multithreading/processing?

I tried using asyncio.new_event_loop() but havent managed to make it work.

Upvotes: 61

Views: 84977

Answers (6)

kissgyorgy
kissgyorgy

Reputation: 3000

The whole point of asyncio is that you can run multiple thousands of I/O-heavy tasks concurrently, so you don't need Threads at all, this is exactly what asyncio is made for. Just run the two coroutines (SNMP and proxy) in the same loop and that's it. You have to make both of them available to the event loop BEFORE calling loop.run_forever(). Something like this:

import asyncio

async def snmp():
    print("Doing the snmp thing")
    await asyncio.sleep(1)

async def proxy():
    print("Doing the proxy thing")
    await asyncio.sleep(2)

async def main():
    while True:
        asyncio.create_task(snmp())
        asyncio.create_task(proxy())    

asyncio.run(main())

I don't know the structure of your code, so the different modules might have their own infinite loop or something, in this case you can run something like this:

import asyncio

async def snmp():
    while True:
        print("Doing the snmp thing")
        await asyncio.sleep(1)

async def proxy():
    while True:
        print("Doing the proxy thing")
        await asyncio.sleep(2)
  
loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
loop.create_task(snmp())
loop.create_task(proxy())
loop.run_forever()

Remember, both snmp and proxy needs to be coroutines (async def) written in an asyncio-aware manner. asyncio will not make simple blocking Python functions suddenly "async".

In your specific case, I suspect that you are confused a little bit (no offense!), because well-written async modules will never block each other in the same loop. If this is the case, you don't need asyncio at all and just simply run one of them in a separate Thread without dealing with any asyncio stuff.

Upvotes: 61

tamerlaha
tamerlaha

Reputation: 1990

I know it's an old thread but it might be still helpful for someone. I'm not good in asyncio but here is a bit improved solution of @kissgyorgy answer. Instead of awaiting each closure separately we create list of tasks and fire them later (python 3.9):

import asyncio

async def snmp():
    while True:
        print("Doing the snmp thing")
        await asyncio.sleep(0.4)

async def proxy():
    while True:
        print("Doing the proxy thing")
        await asyncio.sleep(2)

async def main():
        tasks = []
        tasks.append(asyncio.create_task(snmp()))
        tasks.append(asyncio.create_task(proxy()))

        await asyncio.gather(*tasks)

asyncio.run(main())

Result:

Doing the snmp thing
Doing the proxy thing
Doing the snmp thing
Doing the snmp thing
Doing the snmp thing
Doing the snmp thing
Doing the proxy thing

Upvotes: 9

Johann Chang
Johann Chang

Reputation: 1391

Though in most cases, you don't need multiple event loops running when using asyncio, people shouldn't assume their assumptions apply to all the cases or just give you what they think are better without directly targeting your original question.

Here's a demo of what you can do for creating new event loops in threads. Comparing to your own answer, the set_event_loop does the trick for you not to pass the loop object every time you do an asyncio-based operation.

import asyncio
import threading


async def print_env_info_async():
    # As you can see each work thread has its own asyncio event loop.
    print(f"Thread: {threading.get_ident()}, event loop: {id(asyncio.get_running_loop())}")


async def work():
    while True:
        await print_env_info_async()
        await asyncio.sleep(1)


def worker():
    new_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(new_loop)
    new_loop.run_until_complete(work())
    return


number_of_threads = 2
for _ in range(number_of_threads):
    threading.Thread(target=worker).start()

Ideally, you'll want to put heavy works in worker threads and leave the asncyio thread run as light as possible. Think the asyncio thread as the GUI thread of a desktop or mobile app, you don't want to block it. Worker threads are usually very busy, this is one of the reason you don't want to create separate asyncio event loops in worker threads. Here's an example of how to manage heavy worker threads with a single asyncio event loop. And this is the most common practice in this kind of use cases:

import asyncio
import concurrent.futures
import threading
import time


def print_env_info(source_thread_id):
    # This will be called in the main thread where the default asyncio event loop lives.
    print(f"Thread: {threading.get_ident()}, event loop: {id(asyncio.get_running_loop())}, source thread: {source_thread_id}")


def work(event_loop):
    while True:
        # The following line will fail because there's no asyncio event loop running in this worker thread.
        # print(f"Thread: {threading.get_ident()}, event loop: {id(asyncio.get_running_loop())}")
        event_loop.call_soon_threadsafe(print_env_info, threading.get_ident())
        time.sleep(1)


async def worker():
    print(f"Thread: {threading.get_ident()}, event loop: {id(asyncio.get_running_loop())}")
    loop = asyncio.get_running_loop()
    number_of_threads = 2
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=number_of_threads)
    for _ in range(number_of_threads):
        asyncio.ensure_future(loop.run_in_executor(executor, work, loop))


loop = asyncio.get_event_loop()
loop.create_task(worker())
loop.run_forever()

Upvotes: 27

Markus Bergkvist
Markus Bergkvist

Reputation: 13

If the proxy server is running all the time it cannot switch back and forth. The proxy listens for client requests and makes them asynchronous, but the other task cannot execute, because this one is serving forever.

If the proxy is a coroutine and is starving the SNMP-poller (never awaits), isn't the client requests being starved aswell?

every coroutine will run forever, they will not end

This should be fine, as long as they do await/yield from. The echo server will also run forever, it doesn't mean you can't run several servers (on differents ports though) in the same loop.

Upvotes: 0

brunoop
brunoop

Reputation: 1029

Answering my own question to post my solution:

What I ended up doing was creating a thread and a new event loop inside the thread for the polling module, so now every module runs in a different loop. It is not a perfect solution, but it is the only one that made sense to me(I wanted to avoid threads, but since it is only one...). Example:

import asyncio
import threading


def worker():
    second_loop = asyncio.new_event_loop()
    execute_polling_coroutines_forever(second_loop)
    return

threads = []
t = threading.Thread(target=worker)
threads.append(t)
t.start()

loop = asyncio.get_event_loop()
execute_proxy_coroutines_forever(loop)

Asyncio requires that every loop runs its coroutines in the same thread. Using this method you have one event loop foreach thread, and they are totally independent: every loop will execute its coroutines on its own thread, so that is not a problem. As I said, its probably not the best solution, but it worked for me.

Upvotes: 34

Nihal Sharma
Nihal Sharma

Reputation: 2437

Asyncio event loop is a single thread running and it will not run anything in parallel, it is how it is designed. The closest thing which I can think of is using asyncio.wait.

from asyncio import coroutine
import asyncio

@coroutine
def some_work(x, y):
    print("Going to do some heavy work")
    yield from asyncio.sleep(1.0)
    print(x + y)

@coroutine
def some_other_work(x, y):
    print("Going to do some other heavy work")
    yield from asyncio.sleep(3.0)
    print(x * y)



if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([asyncio.async(some_work(3, 4)), 
                            asyncio.async(some_other_work(3, 4))]))
    loop.close()

an alternate way is to use asyncio.gather() - it returns a future results from the given list of futures.

tasks = [asyncio.Task(some_work(3, 4)), asyncio.Task(some_other_work(3, 4))]
loop.run_until_complete(asyncio.gather(*tasks))

Upvotes: 0

Related Questions