0x1337
0x1337

Reputation: 1104

Parallel async tasks emitted by endless generator

I have a code that very close to this:

class Parser:
    async def fetch(self, url):
        html = await get(url)
        return html

    @property
    def endless_generator(self):
        while True:
            yield url

    async def loop():
        htmls = []
        for url in self.endless_generator:
            htmls.append(await self.fetch(url))
        return htmls

    def run(self):
        loop = asyncio.get_event_loop()
        try:
            htmls = loop.run_until_complete(self.loop())
        finally:
            loop.close()

parser = Parser()
parser.run()

Now Parser.loop run synchronously.

I've tried asyncio.wait and asyncio.gather to achieve async invocations of Parser.fetch, but I don't know the number of URLs in advance (because URLs yielding by endless generator).

So, how do I get asynchronous calls if the number of tasks is not known in advance?

Upvotes: 0

Views: 216

Answers (1)

user4815162342
user4815162342

Reputation: 155515

I've tried asyncio.wait and asyncio.gather to achieve async invocations of Parser.fetch, but I don't know the number of URLs in advance (because URLs yielding by endless generator).

I assume that by endless generator you mean a generator whose number of URLs is not known in advance, rather than a truly endless generator (generating an infinite list). Here is a version that creates a task as soon as a URL is available, and gathers the results as they arrive:

async def loop():
    lp = asyncio.get_event_loop()
    tasks = set()
    result = {}
    any_done = asyncio.Event()

    def _task_done(t):
        tasks.remove(t)
        any_done.set()
        result[t.fetch_url] = t.result()

    for url in self.endless_generator:
        new_task = lp.create_task(self.fetch(url))
        new_task.fetch_url = url
        tasks.add(new_task)
        new_task.add_done_callback(_task_done)
        await any_done.wait()
        any_done.clear()

    while tasks:
        await any_done.wait()
        any_done.clear()

    return result  # mapping url -> html

One cannot simply call gather or wait in each iteration because that would wait for all the existing tasks to finish before queuing a new one. wait(return_when=FIRST_COMPLETED) could work, but it would be O(n**2) in the number of tasks because it would set up its own callback each time anew.

Upvotes: 2

Related Questions