Reputation: 32037
Say I have two async generators:
async def get_rules():
while True:
yield 'rule=1'
asyncio.sleep(2)
async def get_snapshots():
while True:
yield 'snapshot=1'
asyncio.sleep(5)
I want to merge them into a single async generator that returns 2-tuples, with the latest value from both. Sort of combineLatest
.
What is the best way to do this?
Upvotes: 7
Views: 4829
Reputation: 13415
You might want to have a look at aiostream, especially stream.merge and stream.accumulate:
import asyncio
from itertools import count
from aiostream import stream
async def get_rules():
for x in count():
await asyncio.sleep(2)
yield 'rule', x
async def get_snapshots():
for x in count():
await asyncio.sleep(5)
yield 'snapshot', x
async def main():
xs = stream.merge(get_rules(), get_snapshots())
ys = stream.map(xs, lambda x: {x[0]: x[1]})
zs = stream.accumulate(ys, lambda x, e: {**x, **e}, {})
async with zs.stream() as streamer:
async for z in streamer:
print(z)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Output:
{}
{'rule': 0}
{'rule': 1}
{'rule': 1, 'snapshot': 0}
{'rule': 2, 'snapshot': 0}
[...]
See the project page and the documentation for further information.
Disclaimer: I am the project maintainer.
Upvotes: 7
Reputation: 32037
I came up with this:
async def combine(**generators):
"""Given a bunch of async generators, merges the events from
all of them. Each should have a name, i.e. `foo=gen, bar=gen`.
"""
combined = Channel()
async def listen_and_forward(name, generator):
async for value in generator:
await combined.put({name: value})
for name, generator in generators.items():
asyncio.Task(listen_and_forward(name, generator))
async for item in combined:
yield item
async def combine_latest(**generators):
"""Like "combine", but always includes the latest value from
every generator.
"""
current = {}
async for value in combine(**generators):
current.update(value)
yield current
Call it like so:
async for item in combine_latest(rules=rulesgen, snap=snapgen):
print(item)
Output looks like this:
{'rules': 'rule-1'}
{'rules': 'rule-1', 'snap': 'snapshot-1'}
{'rules': 'rule-1', 'snap': 'snapshot-1'}
....
I am using aiochannel, but a normal asyncio.Queue should be fine, too.
Upvotes: 1