Jamie Lindsey
Jamie Lindsey

Reputation: 993

How to implement "new style" Python AsyncIO iterator function from an "old style" class?

I am trying to learn Python AsyncIO but having a lot of trouble with finding good tutorials that are up to date etc.

Let's say we have this "old style" async iterator class:

def chain(sink, *coro_pipeline):
    f = sink
    for coro_func, coro_args, coro_kwargs in coro_pipeline:
        f = coro_func(f, *coro_args, **coro_kwargs)
    return f

class sendable_deque(collections.deque):
    send = collections.deque.append


class AsyncIterator(object):

    def __init__(self, f, buf_size, *coro_pipeline):
        self.events = sendable_deque()
        self.coro = chain(self.events, *coro_pipeline)
        self.coro_finished = False
        self.f = f
        self.buf_size = buf_size

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.events:
            return self.events.popleft()
        if self.coro_finished:
            raise StopAsyncIteration
        while True:
            data = await self.f.read(self.buf_size)
            try:
                self.coro.send(data)
                if self.events:
                    return self.events.popleft()
            except StopIteration:
                self.coro_finished = True
                if self.events:
                    return self.events.popleft()
                raise StopAsyncIteration

# An example of a pipeline
def _parse_pipeline(parser, config):
    return (
        (parser['parse_basecoro'], [], {}),
        (parser['basic_parse_basecoro'], [], config)
    )

How is this now implemented? As I am lead to believe we use something more like:

async def async_parse(f, buf_size):
    while True:
         yield f.read(buf_size)

async def parse(f, buf_size):
    async with async_parse(f, buf_size) as pf:
        print(pf)

But the code in the class is using a chaining method which I believe is done with asyncio.gather()?

Any good modern guides/tutorials would be a great help also.

Upvotes: 1

Views: 255

Answers (1)

user4815162342
user4815162342

Reputation: 154911

Using an async generator the above hand-rolled async iterator can be expressed as follows (untested):

async def async_iterable(f, buf_size, coro_pipeline):
    events = sendable_deque()
    coro = utils.chain(events, *coro_pipeline)
    done = False
    while True:
        while events:
            yield events.popleft()
        if done:
            break
        data = await f.read(buf_size)
        try:
            coro.send(data)
        except StopIteration:
            done = True

In the long run you might want to reconsider the use of bi-directional generators and replace them with native asyncio constructs such as queues.

Upvotes: 1

Related Questions