Reputation: 6861
Is there a way to wait on multiple futures, and yield from them as they are completed in a given order?
Imagine you have two data sources. One gives you id -> name
mapping, the other gives you id -> age
mapping. You want to compute (name, age) -> number_of_ids_with_that_name_and_age
.
There is too much data to just load it, but both data sources support paging/iterating and ordering by id
.
So you write something like
def iterate_names():
for page in get_name_page_numbers():
yield from iterate_name_page(page) # yields (id, name) pairs
and the same for age, and then you iterate over iterate_names()
and iterate_ages()
.
What is wrong with that? What happens is:
Basically, you are not waiting for any requests while you process data.
You could use asyncio.gather
to send all requests and wait for all data, but then:
There is asyncio.as_completed
which allows you to send all requests and process pages as you get results, but you will get pages out of order, so you will not be able to do the processing.
Ideally, there would be a function that would make the first request, and as the response comes, make the second request and yield results from the first at the same moment.
Is that possible?
Upvotes: 5
Views: 7423
Reputation: 988
There are a lot of things going on in your question; I'll try to get to all of them.
Is there a way to wait on multiple futures, and yield from them as they are completed in a given order?
Yes. Your code can yield from
or await
any number of futures in sequence. If you are talking about Task
s specifically and you want these tasks to be executing concurrently, they simply need to be assigned to the loop (done when you asyncio.ensure_future()
or loop.create_task()
) and the loop needs to be running.
As for yielding from them in sequence, you can establish what that sequence is in the first place as you create the tasks. In a simple example where you have created all of the tasks/futures before you start to process their results, you could use a list
to store the task futures and finally pull from the list:
loop = asyncio.get_event_loop()
tasks_im_waiting_for = []
for thing in things_to_get:
task = loop.create_task(get_a_thing_coroutine(thing))
tasks_im_waiting_for.append(task)
@asyncio.coroutine
def process_gotten_things(getter_tasks):
for task in getter_tasks:
result = yield from task
print("We got {}".format(result))
loop.run_until_complete(process_gotten_things(tasks_im_waiting_for))
That example will only process one result at a time, but will still allow any of the scheduled getter tasks to continue doing their thing while it's waiting for the next one in the sequence to complete. If the processing order didn't matter as much and we wanted to process more than one potentially-ready result at a time, then we could use a deque
instead of a list
, with more than one process_gotten_things
task .pop()
ing the getter tasks from the deque
. If we wanted to get even more advanced, we can do as Vincent suggests in a comment to your question and use an asyncio.Queue
instead of a deque
. With such a queue, you can have a producer adding tasks to the queue running concurrently with the task-processing consumers.
Using a deque
, or Queue
for sequencing futures for processing has a disadvantage though, and that's that you are only processing as many futures concurrently as you have running processor tasks. You could create a new processor task every single time you queued up a new future to be processed, but at that point, this queue becomes a completely redundant data structure because asyncio already gives you a queue-like object where every thing added gets processed concurrently: the event loop. For every task we schedule, we can also schedule its processing. Revising the above example:
for thing in things_to_get:
getter_task = loop.create_task(get_a_thing_coroutine(thing))
processor_task = loop.create_task(process_gotten_thing(getter_task))
# Tasks are futures; the processor can await the result once started
Now let's say that our getter might return multiple things (kind of like your scenario) and each of those things needs some processing. That brings me to a different asyncio design pattern: sub-tasks. Your tasks can schedule other tasks on the event loop. As the event loop is run, the order of your first tasks will still be maintained, but if any one of them ends up waiting on something, there's a chance one of your sub-tasks will get started in the midst of things. Revising the above scenario, we might pass the loop to our coroutine so the coroutine can schedule the tasks that processes its results:
for thing in things_to_get:
task = loop.create_task(get_a_thing_coroutine(thing, loop))
@asyncio.coroutine
def get_a_thing_coroutine(thing, loop):
results = yield from long_time_database_call(thing)
subtasks = []
for result in results:
subtasks.append(loop.create_task(process_result(result)))
# With subtasks scheduled in the order we like, wait for them
# to finish before we consider THIS task complete.
yield from asyncio.wait(subtasks)
All these advanced patterns start tasks in the order you want, but might finish processing them in any order. If you truly need to process the results in the exact same order that you started getting those results, then stick to a single processor pulling result futures from a sequence or yielding from an asyncio.Queue
.
You'll also notice that to ensure tasks starting in a predictable order, I explicitly schedule them with loop.create_task()
. While asyncio.gather()
and asyncio.wait()
will happily take coroutine objects and schedule/wrap them as Task
s, they have problems with scheduling them in a predictable order as of me writing this. See asyncio issue #432.
OK, let's get back to your specific case. You have two separate sources of results, and those results need to be joined together by a common key, an id
. The patterns I mentioned for getting things and processing those things don't account for such a problem, and I don't know the perfect pattern for it off the top of my head. I'll go through what I might do to attempt this though.
We need some objects to maintain the state of what we know and what we've done so far for the sake of correlating that knowledge as it grows.
# defaultdicts are great for representing knowledge that an interested
# party might want whether or not we have any knowledge to begin with:
from collections import defaultdict
# Let's start with a place to store our end goal:
name_and_age_to_id_count = defaultdict(int)
# Given we're correlating info from two sources, let's make two places to
# store that info, keyed by what we're joining on: id
# When we join correlate this info, only one side might be known, so use a
# Future on both sides to represent data we may or may not have yet.
id_to_age_future = defaultdict(loop.create_future)
id_to_name_future = defaultdict(loop.create_future)
# As soon as we learn the name or age for an id, we can begin processing
# the joint information, but because this information is coming from
# multiple sources we want to process concurrently we need to keep track
# of what ids we've started processing the joint info for.
ids_scheduled_for_processing = set()
We know we'll be getting this information in "pages" via the iterators you mentioned, so let's start there in designing our tasks:
@asyncio.coroutine
def process_name_page(page_number):
subtasks = []
for id, name in iterate_name_page(page_number):
name_future = id_to_name_future[id]
name_future.set_result(name)
if id not in ids_scheduled_for_processing:
age_future = id_to_age_future[id]
task = loop.create_task(increment_name_age_pair(id, name_future, age_future))
subtasks.append(task)
ids_scheduled_for_processing.add(id)
yield from asyncio.wait(subtasks)
@asyncio.coroutine
def process_age_page(page_number):
subtasks = []
for id, age in iterate_age_page(page_number):
age_future = id_to_age_future[id]
age_future.set_result(age)
if id not in ids_scheduled_for_processing:
name_future = id_to_name_future[id]
task = loop.create_task(increment_name_age_pair(id, name_future, age_future))
subtasks.append(task)
ids_scheduled_for_processing.add(id)
yield from asyncio.wait(subtasks)
Those coroutines schedule the name/age pair of an id to be processed—more specifically, the name and age futures for an id. Once started, the processor will await both futures' results (joining them, in a sense).
@asyncio.coroutine
def increment_name_age_pair(id, name_future, age_future):
# This will wait until both futures are resolved and let other tasks work in the meantime:
pair = ((yield from name_future), (yield from age_future))
name_and_age_to_id_count[pair] += 1
# If memory is a concern:
ids_scheduled_for_processing.discard(id)
del id_to_age_future[id]
del id_to_name_future[id]
OK, we've got tasks for getting/iterating the pages and subtasks for processing what's in those pages. Now we need to actually schedule the getting of those pages. Back to your problem, we've got two datasources we want to pull from, and we want to pull from them in parallel. We assume the order of information from one closely correlates to the order of information from another, so we interleave the processing of both in the event loop.
page_processing_tasks = []
# Interleave name and age pages:
for name_page_number, age_page_number in zip_longest(
get_name_page_numbers(),
get_age_page_numbers()
):
# Explicitly schedule it as a task in the order we want because gather
# and wait have non-deterministic scheduling order:
if name_page_number is not None:
page_processing_tasks.append(loop.create_task(process_name_page(name_page_number)))
if age_page_number is not None:
page_processing_tasks.append(loop.create_task(process_age_page(age_page_number)))
Now that we have scheduled the top level tasks, we can finally actually do the things:
loop.run_until_complete(asyncio.wait(page_processing_tasks))
print(name_and_age_to_id_count)
asyncio
may not solve all of your parallel processing woes. You mentioned that the "processing" each page to iterate takes forever. If it takes forever because it's awaiting responses from a server, then this architecture is a neat lightweight approach to do what you need (just make sure the i/o is being done with asyncio loop-aware tools).
If it takes forever because Python is crunching numbers or moving things around with CPU and memory, asyncio's single-threaded event loop doesn't help you much because only one Python operation is happening at a time. In this scenario, you may want to look into using loop.run_in_executor
with a pool of Python interpreter processes if you'd like to stick with asyncio and the sub-task pattern. You could also develop a solution using the concurrent.futures
library with a process pool instead of using asyncio.
Note: The example generator you gave might be confusing to some because it uses yield from
to delegate generation to an inner generator. It just so happens that asyncio coroutines use the same expression to await a future result and tell the loop it can run other coroutines' code if it wants.
Upvotes: 5
Reputation: 17376
asyncio has no such functionality but you may write a simple wrapper around as_completed
for yielding data in-order.
It may be built using small sliding window buffer for storing newer completed data while older result is not available yet.
Upvotes: 1