Reputation: 1049
I would like to implement and run custom Python coroutines (without using asyncio) to have a better "under the hood" understanding of asynchronous mechanisms.
I was expected to be able to use concurrency to start a second task when the first task is waiting, doing nothing.
Here the synchronous implementation of a stacker (which is an arbitrary use case).
def log(*msg):
print(int(time() - start), ':', *msg)
def stack(stack, item):
sleep(1)
stack.append(item)
start = time()
words = []
stack(words, 'hello')
log(words)
stack(words, 'world')
log(words)
Here the output, as I was expected:
1 : ['hello']
2 : ['hello', 'world']
Then an attempt of the asynchronous implementation of the same stacker.
def coroutine(func):
def starter(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen)
return gen
return starter
@coroutine
def a_sleep(count):
while True:
yield
sleep(count)
@coroutine
def a_stack(stack):
while True:
item = yield
yield from a_sleep(1)
stack.append(item)
start = time()
words = []
a_stack(words).send('hello')
log(words)
a_stack(words).send('world')
log(words)
# Wait all tasks to finish
sleep(4)
log(words)
Expected output:
0 : []
1 : ['hello', 'world']
5 : ['hello', 'world']
Real output:
1 : []
2 : []
6 : []
I figure I missed something important. I hope my approach is relevant.
With additional logs, I have noticed that the a_stack function never execute the append part.
Upvotes: 1
Views: 897
Reputation: 154886
The line:
a_stack(words).send('hello')
does two things:
hello
into the generatorThe created generator waits for item to arrive, and then, once resumed, does something with the item. And that is the problem, you never resume the generator, you throw it away and create a new one, and proceed to use in the same manner. To fix it, your sending code should do something like:
coro = a_stack(words)
coro.send('hello')
log(words)
coro.send('world')
log(words)
But there is another problem. Before actually appending to the stack, a_stack
defers its execution to another iterator, which never stops yielding. One way to code a_sleep
that fixes the problem is:
@coroutine
def a_sleep(count):
t0 = time()
while time() - t0 < count:
yield 'notyet'
Then you need either a scheduler or at least a more resilient version of send
, which can actually deal with a task deferring its execution. A simple (and very inefficient) one could looks like this:
def sync_send(c, v):
while True:
ret = c.send(v)
if ret != 'notyet':
return ret
After replacing coro.send('hello')
with sync_send(coro, 'hello')
, the expected output is displayed.
A real scheduler would never busy-loop; it would be instructed by sleep
and by other potentially blocking calls, such as reads from files or sockets, which IO/timing events it must wait for. After the appropriate event arrived, it would wake up the correct task. This is the core of what asyncio
does.
To learn more about how generators and yield from
are used as a core abstraction for asynchronous programming, I recommend the excellent lecture Python Concurrency From the Ground by Dave Beazley. In the lecture Dave implements a coroutine scheduler in front of live audience, showing the design of his curio library.
Upvotes: 1
Reputation: 1121654
Your problem is that your generators are paused at the yield
expression in the a_sleep()
function (via the yield from a_sleep(1)
) delegation. The generator there is also infinite and so will never return. You can never advance your generators far enough to reach the stack.append(item)
calls.
I think you misunderstood what yield from
does here. yield from
moves control of the generator to another generator; that other generator has to complete iteration before the yield from
expression completes and returns:
>>> @coroutine
... def a_sleep(count):
... while True:
... yield 'sleeping' # to illustrate where we are stuck
... sleep(count)
...
>>> words = []
>>> g = a_stack(words)
>>> g.send('hello')
'sleeping'
>>> g.send('hello')
'sleeping'
>>> g.send('hello')
'sleeping'
Instead of using sleep()
and an infinite loop, record the time, and loop until the time has passed:
>>> @coroutine
... def a_sleep(count):
... start = time()
... while int(time() - start) < count:
... yield 'sleeping'
...
>>> g = a_stack(words)
>>> g.send('hello')
'sleeping'
>>> g.send('hello')
'sleeping'
>>> g.send('hello')
>>> words
['hello']
You'll have to keep iterating over your generators (in a loop, perhaps?) to have them alternate execution.
The asyncio.sleep()
function is of course much more efficient than that; it uses a Future()
object that attaches to the AbstactEventLoop.call_later()
functionality offered by the event loop. The loop lets the future object know when the time is up, at which point the future is marked 'ready' and the coroutine that produced it is continued again.
Upvotes: 2