samfrances
samfrances

Reputation: 3685

Merging async iterables in python3

Is there a good way, or a well-supported library, for merging async iterators in python3?

The desired behavior is basically the same as that of merging observables in reactivex.

That is, in the normal case, if I'm merging two async iterator, I want the resulting async iterator to yield results chronologically. An error in one of the iterators should derail the merged iterator.

Merging Observables

(Source: http://reactivex.io/documentation/operators/merge.html)

This is my best attempt, but it seems like something there might be a standard solution to:

async def drain(stream, q, sentinal=None):
    try:
        async for item in stream:
            await q.put(item)
        if sentinal:
            await q.put(sentinal)
    except BaseException as e:
        await q.put(e)


async def merge(*streams):

    q = asyncio.Queue()
    sentinal = namedtuple("QueueClosed", ["truthy"])(True)

    futures = {
        asyncio.ensure_future(drain(stream, q, sentinal)) for stream in streams
    }

    remaining = len(streams)
    while remaining > 0:
        result = await q.get()
        if result is sentinal:
            remaining -= 1
            continue
        if isinstance(result, BaseException):
            raise result
        yield result


if __name__ == "__main__":

    # Example: Should print:
    #   1
    #   2
    #   3
    #   4

    loop = asyncio.get_event_loop()

    async def gen():
        yield 1
        await asyncio.sleep(1.5)
        yield 3

    async def gen2():
        await asyncio.sleep(1)
        yield 2
        await asyncio.sleep(1)
        yield 4

    async def go():
        async for x in merge(gen(), gen2()):
            print(x)

    loop.run_until_complete(go())

Upvotes: 8

Views: 2211

Answers (1)

Vincent
Vincent

Reputation: 13415

You can use aiostream.stream.merge:

from aiostream import stream

async def go():
    async for x in stream.merge(gen(), gen2()):
        print(x)

More examples in the documentation and this answer.

Upvotes: 10

Related Questions