Laharah
Laharah

Reputation: 373

asyncio tasks getting unexpectedly defered

I've been trying to learn a bit about asyncio, and I'm having some unexpected behavior. I've set up a simple fibonacci server that supports multiple connections using streams. The fib calculation is written recursively, so I can simulate long running calculations by entering in a large number. As expected, long running calculations block I/O until the long running calculation completes.

Here's the problem though. I rewrote the fibonacci function to be a coroutine. I expected that by yielding from each recursion, control would fall back to the event loop, and awaiting I/O tasks would get a chance to execute, and that you'd even be able to run multiple fib calculations concurrently. This however doesn't seem to be the case.

Here's the code:

import asyncio

@asyncio.coroutine
def fib(n):
    if n < 1:
        return 1
    a = yield from fib(n-1)
    b = yield from fib(n-2)
    return a + b


@asyncio.coroutine
def fib_handler(reader, writer):
    print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
    while True:
        req = yield from reader.readline()
        if not req:
            break
        print(req)
        n = int(req)
        result = yield from fib(n)
        writer.write('{}\n'.format(result).encode('ascii'))
        yield from writer.drain()
    writer.close()
    print("Closed")


def server(address):
    loop = asyncio.get_event_loop()
    fib_server = asyncio.start_server(fib_handler, *address, loop=loop)
    fib_server = loop.run_until_complete(fib_server)
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        print('closing...')
        fib_server.close()
        loop.run_until_complete(fib_server.wait_closed())
        loop.close()


server(('', 25000))

This server runs perfectly well if you netcat to port 25000 and start entering in numbers. However if you start a long running calculation (say 35), no other calculations will run until the first completes. In fact, additional connections won't even be processed.

I know that the event loop is feeding back the yields from recursive fib calls, so control has to be falling all the way down. But I thought that the loop would process the other calls in the I/O queues (such as spawning a second fib_handler) before "trampolining" back to the fib function.

I'm sure I must be misunderstanding something or that there is some kind of bug I'm overlooking but I can't for the life of me find it.

Any insight you can provide will be much appreciated.

Upvotes: 2

Views: 385

Answers (1)

dano
dano

Reputation: 94881

The first issue is that you're calling yield from fib(n) inside of fib_handler. Including yield from means that fib_handler will block until the call to fib(n) is complete, which means it can't handle any input you provide while fib is running. You would have this problem even if all you did was I/O inside of fib. To fix this, you should use asyncio.async(fib(n)) (or preferably, asyncio.ensure_future(fib(n)), if you have a new enough version of Python) to schedule fib with the event loop, without actually blocking fib_handler. From there, you can use Future.add_done_callback to write the result to the client when it's ready:

import asyncio
from functools import partial
from concurrent.futures import ProcessPoolExecutor

@asyncio.coroutine
def fib(n):
    if n < 1:
        return 1
    a = yield from fib(n-1)
    b = yield from fib(n-2)
    return a + b

def do_it(writer, result):
    writer.write('{}\n'.format(result.result()).encode('ascii'))
    asyncio.async(writer.drain())

@asyncio.coroutine
def fib_handler(reader, writer):
    print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
    executor = ProcessPoolExecutor(4)
    loop = asyncio.get_event_loop()
    while True:
        req = yield from reader.readline()
        if not req:
            break
        print(req)
        n = int(req)
        result = asyncio.async(fib(n))
        # Write the result to the client when fib(n) is done.
        result.add_done_callback(partial(do_it, writer))
    writer.close()
    print("Closed")

That said, this change alone still won't completely fix the problem; while it will allow multiple clients to connect and issue commands concurrently, a single client will still get synchronous behavior. This happens because when you call yield from coro() directly on a coroutine function, control isn't given back to the event loop until coro() (or another coroutine called by coro) actually executes some non-blocking I/O. Otherwise, Python will just execute coro without yielding control. This is a useful performance optimization, since giving control to the event loop when your coroutine isn't actually going to do blocking I/O is a waste of time, especially given Python's high function call overhead.

In your case, fib never does any I/O, so once you call yield from fib(n-1) inside of fib itself, the event loop never gets to run again until its done recursing, which will block fib_handler from reading any subsequent input from the client until the call to fib is done. Wrapping all your calls to fib in asyncio.async guarantees that control is given to the event loop each time you make a yield from asyncio.async(fib(...)) call. When I made this change, in addition to using asyncio.async(fib(n)) in fib_handler, I was able to process multiple inputs from a single client concurrently. Here's the full example code:

import asyncio
from functools import partial
from concurrent.futures import ProcessPoolExecutor

@asyncio.coroutine
def fib(n):
    if n < 1:
        return 1
    a = yield from fib(n-1)
    b = yield from fib(n-2)
    return a + b

def do_it(writer, result):
    writer.write('{}\n'.format(result.result()).encode('ascii'))
    asyncio.async(writer.drain())

@asyncio.coroutine
def fib_handler(reader, writer):
    print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
    executor = ProcessPoolExecutor(4)
    loop = asyncio.get_event_loop()
    while True:
        req = yield from reader.readline()
        if not req:
            break
        print(req)
        n = int(req)
        result = asyncio.async(fib(n))
        result.add_done_callback(partial(do_it, writer))
    writer.close()
    print("Closed")

Input/Output on client-side:

dan@dandesk:~$ netcat localhost 25000
35 # This was input
4  # This was input
8  # output
24157817 # output

Now, even though this works, I wouldn't use this implementation, since its doing a bunch of CPU-bound work in a single-threaded program that also wants to serve I/O in that same thread. This isn't going to scale very well, and won't have ideal performance. Instead, I'd recommend using loop.run_in_executor to run the calls to fib in a background process, which allows the asyncio thread to run at full capacity, and also allows us to scale the calls to fib across multiple cores:

import asyncio
from functools import partial
from concurrent.futures import ProcessPoolExecutor

def fib(n):
    if n < 1:
        return 1
    a = fib(n-1)
    b = fib(n-2)
    return a + b

def do_it(writer, result):
    writer.write('{}\n'.format(result.result()).encode('ascii'))
    asyncio.async(writer.drain())

@asyncio.coroutine
def fib_handler(reader, writer):
    print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
    executor = ProcessPoolExecutor(8)  # 8 Processes in the pool
    loop = asyncio.get_event_loop()
    while True:
        req = yield from reader.readline()
        if not req:
            break
        print(req)
        n = int(req)
        result = loop.run_in_executor(executor, fib, n)
        result.add_done_callback(partial(do_it, writer))
    writer.close()
    print("Closed")

Upvotes: 3

Related Questions