srjjio
srjjio

Reputation: 1049

Custom coroutines in Python

I would like to implement and run custom Python coroutines (without using asyncio) to have a better "under the hood" understanding of asynchronous mechanisms.

I was expected to be able to use concurrency to start a second task when the first task is waiting, doing nothing.

Here the synchronous implementation of a stacker (which is an arbitrary use case).

def log(*msg):
    print(int(time() - start), ':', *msg)

def stack(stack, item):
    sleep(1)
    stack.append(item)

start = time()
words = []
stack(words, 'hello')
log(words)
stack(words, 'world')
log(words)

Here the output, as I was expected:

1 : ['hello']
2 : ['hello', 'world']

Then an attempt of the asynchronous implementation of the same stacker.

def coroutine(func):
    def starter(*args, **kwargs):
        gen = func(*args, **kwargs)
        next(gen)
        return gen
    return starter

@coroutine
def a_sleep(count):
    while True:
        yield
        sleep(count)

@coroutine
def a_stack(stack):
    while True:
        item = yield
        yield from a_sleep(1)
        stack.append(item)

start = time()
words = []
a_stack(words).send('hello')
log(words)
a_stack(words).send('world')
log(words)

# Wait all tasks to finish
sleep(4)
log(words)

Expected output:

0 : []
1 : ['hello', 'world']
5 : ['hello', 'world']

Real output:

1 : []
2 : []
6 : []

I figure I missed something important. I hope my approach is relevant.

With additional logs, I have noticed that the a_stack function never execute the append part.

Upvotes: 1

Views: 897

Answers (2)

user4815162342
user4815162342

Reputation: 154886

The line:

a_stack(words).send('hello')

does two things:

  1. create and run a new generator
  2. send the string hello into the generator

The created generator waits for item to arrive, and then, once resumed, does something with the item. And that is the problem, you never resume the generator, you throw it away and create a new one, and proceed to use in the same manner. To fix it, your sending code should do something like:

coro = a_stack(words)
coro.send('hello')
log(words)
coro.send('world')
log(words)

But there is another problem. Before actually appending to the stack, a_stack defers its execution to another iterator, which never stops yielding. One way to code a_sleep that fixes the problem is:

@coroutine
def a_sleep(count):
    t0 = time()
    while time() - t0 < count:
        yield 'notyet'

Then you need either a scheduler or at least a more resilient version of send, which can actually deal with a task deferring its execution. A simple (and very inefficient) one could looks like this:

def sync_send(c, v):
    while True:
        ret = c.send(v)
        if ret != 'notyet':
            return ret

After replacing coro.send('hello') with sync_send(coro, 'hello'), the expected output is displayed.

A real scheduler would never busy-loop; it would be instructed by sleep and by other potentially blocking calls, such as reads from files or sockets, which IO/timing events it must wait for. After the appropriate event arrived, it would wake up the correct task. This is the core of what asyncio does.

To learn more about how generators and yield from are used as a core abstraction for asynchronous programming, I recommend the excellent lecture Python Concurrency From the Ground by Dave Beazley. In the lecture Dave implements a coroutine scheduler in front of live audience, showing the design of his curio library.

Upvotes: 1

Martijn Pieters
Martijn Pieters

Reputation: 1121654

Your problem is that your generators are paused at the yield expression in the a_sleep() function (via the yield from a_sleep(1)) delegation. The generator there is also infinite and so will never return. You can never advance your generators far enough to reach the stack.append(item) calls.

I think you misunderstood what yield from does here. yield from moves control of the generator to another generator; that other generator has to complete iteration before the yield from expression completes and returns:

>>> @coroutine
... def a_sleep(count):
...     while True:
...         yield 'sleeping'  # to illustrate where we are stuck
...         sleep(count)
...
>>> words = []
>>> g = a_stack(words)
>>> g.send('hello')
'sleeping'
>>> g.send('hello')
'sleeping'
>>> g.send('hello')
'sleeping'

Instead of using sleep() and an infinite loop, record the time, and loop until the time has passed:

>>> @coroutine
... def a_sleep(count):
...     start = time()
...     while int(time() - start) < count:
...         yield 'sleeping'
...
>>> g = a_stack(words)
>>> g.send('hello')
'sleeping'
>>> g.send('hello')
'sleeping'
>>> g.send('hello')
>>> words
['hello']

You'll have to keep iterating over your generators (in a loop, perhaps?) to have them alternate execution.

The asyncio.sleep() function is of course much more efficient than that; it uses a Future() object that attaches to the AbstactEventLoop.call_later() functionality offered by the event loop. The loop lets the future object know when the time is up, at which point the future is marked 'ready' and the coroutine that produced it is continued again.

Upvotes: 2

Related Questions