Reputation: 993
I am trying to learn Python AsyncIO but having a lot of trouble with finding good tutorials that are up to date etc.
Let's say we have this "old style" async iterator class:
def chain(sink, *coro_pipeline):
f = sink
for coro_func, coro_args, coro_kwargs in coro_pipeline:
f = coro_func(f, *coro_args, **coro_kwargs)
return f
class sendable_deque(collections.deque):
send = collections.deque.append
class AsyncIterator(object):
def __init__(self, f, buf_size, *coro_pipeline):
self.events = sendable_deque()
self.coro = chain(self.events, *coro_pipeline)
self.coro_finished = False
self.f = f
self.buf_size = buf_size
def __aiter__(self):
return self
async def __anext__(self):
if self.events:
return self.events.popleft()
if self.coro_finished:
raise StopAsyncIteration
while True:
data = await self.f.read(self.buf_size)
try:
self.coro.send(data)
if self.events:
return self.events.popleft()
except StopIteration:
self.coro_finished = True
if self.events:
return self.events.popleft()
raise StopAsyncIteration
# An example of a pipeline
def _parse_pipeline(parser, config):
return (
(parser['parse_basecoro'], [], {}),
(parser['basic_parse_basecoro'], [], config)
)
How is this now implemented? As I am lead to believe we use something more like:
async def async_parse(f, buf_size):
while True:
yield f.read(buf_size)
async def parse(f, buf_size):
async with async_parse(f, buf_size) as pf:
print(pf)
But the code in the class is using a chaining method which I believe is done with asyncio.gather()
?
Any good modern guides/tutorials would be a great help also.
Upvotes: 1
Views: 255
Reputation: 154911
Using an async generator the above hand-rolled async iterator can be expressed as follows (untested):
async def async_iterable(f, buf_size, coro_pipeline):
events = sendable_deque()
coro = utils.chain(events, *coro_pipeline)
done = False
while True:
while events:
yield events.popleft()
if done:
break
data = await f.read(buf_size)
try:
coro.send(data)
except StopIteration:
done = True
In the long run you might want to reconsider the use of bi-directional generators and replace them with native asyncio constructs such as queues.
Upvotes: 1