Reputation: 373
I've been trying to learn a bit about asyncio
, and I'm having some unexpected behavior. I've set up a simple fibonacci server that supports multiple connections using streams. The fib calculation is written recursively, so I can simulate long running calculations by entering in a large number. As expected, long running calculations block I/O until the long running calculation completes.
Here's the problem though. I rewrote the fibonacci function to be a coroutine. I expected that by yielding from each recursion, control would fall back to the event loop, and awaiting I/O tasks would get a chance to execute, and that you'd even be able to run multiple fib calculations concurrently. This however doesn't seem to be the case.
Here's the code:
import asyncio
@asyncio.coroutine
def fib(n):
if n < 1:
return 1
a = yield from fib(n-1)
b = yield from fib(n-2)
return a + b
@asyncio.coroutine
def fib_handler(reader, writer):
print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
while True:
req = yield from reader.readline()
if not req:
break
print(req)
n = int(req)
result = yield from fib(n)
writer.write('{}\n'.format(result).encode('ascii'))
yield from writer.drain()
writer.close()
print("Closed")
def server(address):
loop = asyncio.get_event_loop()
fib_server = asyncio.start_server(fib_handler, *address, loop=loop)
fib_server = loop.run_until_complete(fib_server)
try:
loop.run_forever()
except KeyboardInterrupt:
print('closing...')
fib_server.close()
loop.run_until_complete(fib_server.wait_closed())
loop.close()
server(('', 25000))
This server runs perfectly well if you netcat to port 25000 and start entering in numbers. However if you start a long running calculation (say 35), no other calculations will run until the first completes. In fact, additional connections won't even be processed.
I know that the event loop is feeding back the yields from recursive fib
calls, so control has to be falling all the way down. But I thought that the loop would process the other calls in the I/O queues (such as spawning a second fib_handler
) before "trampolining" back to the fib function.
I'm sure I must be misunderstanding something or that there is some kind of bug I'm overlooking but I can't for the life of me find it.
Any insight you can provide will be much appreciated.
Upvotes: 2
Views: 385
Reputation: 94881
The first issue is that you're calling yield from fib(n)
inside of fib_handler
. Including yield from
means that fib_handler
will block until the call to fib(n)
is complete, which means it can't handle any input you provide while fib
is running. You would have this problem even if all you did was I/O inside of fib
. To fix this, you should use asyncio.async(fib(n))
(or preferably, asyncio.ensure_future(fib(n))
, if you have a new enough version of Python) to schedule fib
with the event loop, without actually blocking fib_handler
. From there, you can use Future.add_done_callback
to write the result to the client when it's ready:
import asyncio
from functools import partial
from concurrent.futures import ProcessPoolExecutor
@asyncio.coroutine
def fib(n):
if n < 1:
return 1
a = yield from fib(n-1)
b = yield from fib(n-2)
return a + b
def do_it(writer, result):
writer.write('{}\n'.format(result.result()).encode('ascii'))
asyncio.async(writer.drain())
@asyncio.coroutine
def fib_handler(reader, writer):
print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
executor = ProcessPoolExecutor(4)
loop = asyncio.get_event_loop()
while True:
req = yield from reader.readline()
if not req:
break
print(req)
n = int(req)
result = asyncio.async(fib(n))
# Write the result to the client when fib(n) is done.
result.add_done_callback(partial(do_it, writer))
writer.close()
print("Closed")
That said, this change alone still won't completely fix the problem; while it will allow multiple clients to connect and issue commands concurrently, a single client will still get synchronous behavior. This happens because when you call yield from coro()
directly on a coroutine function, control isn't given back to the event loop until coro()
(or another coroutine called by coro
) actually executes some non-blocking I/O. Otherwise, Python will just execute coro
without yielding control. This is a useful performance optimization, since giving control to the event loop when your coroutine isn't actually going to do blocking I/O is a waste of time, especially given Python's high function call overhead.
In your case, fib
never does any I/O, so once you call yield from fib(n-1)
inside of fib
itself, the event loop never gets to run again until its done recursing, which will block fib_handler
from reading any subsequent input from the client until the call to fib
is done. Wrapping all your calls to fib
in asyncio.async
guarantees that control is given to the event loop each time you make a yield from asyncio.async(fib(...))
call. When I made this change, in addition to using asyncio.async(fib(n))
in fib_handler
, I was able to process multiple inputs from a single client concurrently. Here's the full example code:
import asyncio
from functools import partial
from concurrent.futures import ProcessPoolExecutor
@asyncio.coroutine
def fib(n):
if n < 1:
return 1
a = yield from fib(n-1)
b = yield from fib(n-2)
return a + b
def do_it(writer, result):
writer.write('{}\n'.format(result.result()).encode('ascii'))
asyncio.async(writer.drain())
@asyncio.coroutine
def fib_handler(reader, writer):
print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
executor = ProcessPoolExecutor(4)
loop = asyncio.get_event_loop()
while True:
req = yield from reader.readline()
if not req:
break
print(req)
n = int(req)
result = asyncio.async(fib(n))
result.add_done_callback(partial(do_it, writer))
writer.close()
print("Closed")
Input/Output on client-side:
dan@dandesk:~$ netcat localhost 25000
35 # This was input
4 # This was input
8 # output
24157817 # output
Now, even though this works, I wouldn't use this implementation, since its doing a bunch of CPU-bound work in a single-threaded program that also wants to serve I/O in that same thread. This isn't going to scale very well, and won't have ideal performance. Instead, I'd recommend using loop.run_in_executor
to run the calls to fib
in a background process, which allows the asyncio thread to run at full capacity, and also allows us to scale the calls to fib
across multiple cores:
import asyncio
from functools import partial
from concurrent.futures import ProcessPoolExecutor
def fib(n):
if n < 1:
return 1
a = fib(n-1)
b = fib(n-2)
return a + b
def do_it(writer, result):
writer.write('{}\n'.format(result.result()).encode('ascii'))
asyncio.async(writer.drain())
@asyncio.coroutine
def fib_handler(reader, writer):
print('Connection from : {}'.format(writer.transport.get_extra_info('peername')))
executor = ProcessPoolExecutor(8) # 8 Processes in the pool
loop = asyncio.get_event_loop()
while True:
req = yield from reader.readline()
if not req:
break
print(req)
n = int(req)
result = loop.run_in_executor(executor, fib, n)
result.add_done_callback(partial(do_it, writer))
writer.close()
print("Closed")
Upvotes: 3