miracle2k
miracle2k

Reputation: 32037

Wait on Python async generators

Say I have two async generators:

async def get_rules():
    while True:
        yield 'rule=1'
        asyncio.sleep(2)


async def get_snapshots():
    while True:
        yield 'snapshot=1'
        asyncio.sleep(5)

I want to merge them into a single async generator that returns 2-tuples, with the latest value from both. Sort of combineLatest.

What is the best way to do this?

Upvotes: 7

Views: 4829

Answers (2)

Vincent
Vincent

Reputation: 13415

You might want to have a look at aiostream, especially stream.merge and stream.accumulate:

import asyncio
from itertools import count
from aiostream import stream


async def get_rules():
    for x in count():
        await asyncio.sleep(2)
        yield 'rule', x


async def get_snapshots():
    for x in count():
        await asyncio.sleep(5)
        yield 'snapshot', x


async def main():
    xs = stream.merge(get_rules(), get_snapshots())
    ys = stream.map(xs, lambda x: {x[0]: x[1]})
    zs = stream.accumulate(ys, lambda x, e: {**x, **e}, {})

    async with zs.stream() as streamer:
        async for z in streamer:
            print(z)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

Output:

{}
{'rule': 0}
{'rule': 1}
{'rule': 1, 'snapshot': 0}
{'rule': 2, 'snapshot': 0}
[...]

See the project page and the documentation for further information.

Disclaimer: I am the project maintainer.

Upvotes: 7

miracle2k
miracle2k

Reputation: 32037

I came up with this:

async def combine(**generators):
    """Given a bunch of async generators, merges the events from
    all of them. Each should have a name, i.e. `foo=gen, bar=gen`.
    """
    combined = Channel()
    async def listen_and_forward(name, generator):
        async for value in generator:
            await combined.put({name: value})
    for name, generator in generators.items():
        asyncio.Task(listen_and_forward(name, generator))

    async for item in combined:
        yield item


async def combine_latest(**generators):
    """Like "combine", but always includes the latest value from
    every generator.
    """
    current = {}
    async for value in combine(**generators):
        current.update(value)
        yield current

Call it like so:

async for item in combine_latest(rules=rulesgen, snap=snapgen):
    print(item)

Output looks like this:

{'rules': 'rule-1'}
{'rules': 'rule-1', 'snap': 'snapshot-1'}
{'rules': 'rule-1', 'snap': 'snapshot-1'}
....

I am using aiochannel, but a normal asyncio.Queue should be fine, too.

Upvotes: 1

Related Questions