Reputation: 1104
I have a code that very close to this:
class Parser:
async def fetch(self, url):
html = await get(url)
return html
@property
def endless_generator(self):
while True:
yield url
async def loop():
htmls = []
for url in self.endless_generator:
htmls.append(await self.fetch(url))
return htmls
def run(self):
loop = asyncio.get_event_loop()
try:
htmls = loop.run_until_complete(self.loop())
finally:
loop.close()
parser = Parser()
parser.run()
Now Parser.loop
run synchronously.
I've tried asyncio.wait
and asyncio.gather
to achieve async invocations of Parser.fetch
, but I don't know the number of URLs in advance (because URLs yielding by endless generator).
So, how do I get asynchronous calls if the number of tasks is not known in advance?
Upvotes: 0
Views: 216
Reputation: 155515
I've tried
asyncio.wait
andasyncio.gather
to achieve async invocations ofParser.fetch
, but I don't know the number of URLs in advance (because URLs yielding by endless generator).
I assume that by endless generator you mean a generator whose number of URLs is not known in advance, rather than a truly endless generator (generating an infinite list). Here is a version that creates a task as soon as a URL is available, and gathers the results as they arrive:
async def loop():
lp = asyncio.get_event_loop()
tasks = set()
result = {}
any_done = asyncio.Event()
def _task_done(t):
tasks.remove(t)
any_done.set()
result[t.fetch_url] = t.result()
for url in self.endless_generator:
new_task = lp.create_task(self.fetch(url))
new_task.fetch_url = url
tasks.add(new_task)
new_task.add_done_callback(_task_done)
await any_done.wait()
any_done.clear()
while tasks:
await any_done.wait()
any_done.clear()
return result # mapping url -> html
One cannot simply call gather
or wait
in each iteration because that would wait for all the existing tasks to finish before queuing a new one. wait(return_when=FIRST_COMPLETED)
could work, but it would be O(n**2)
in the number of tasks because it would set up its own callback each time anew.
Upvotes: 2