Reputation: 6287
I have a large number of futures pointing to data which I subsequently need to aggregate remotely in another task:
futures = [client.submit(get_data, i) for i in range(0, LARGE_NUMBER)]
agg_future = client.submit(aggregate, futures)
The issue with the above is that the client complains about the size of the argument I am submitting due to the large number of futures.
If I were willing to pull the data back to the client, I would simply use gather
to collect the results:
agg_local = aggregate(client.gather(futures))
This, however, I would explicitly like to avoid. Is there a way (ideally non-blocking) to effectively gather
the futures results within a remote task without having the client complain about the size of the list of futures being aggregated?
Upvotes: 0
Views: 332
Reputation: 28683
If your workload really suits the creation of many many futures and aggregating them in a single function, you could easily ignore the warning and continue.
However, you might find it more efficient to perform something like the tree summation example from the docs. That case is for delayed, but a client.submit version would look pretty similar, something like replacing the line
lazy = add(L[i], L[i + 1])
with
lazy = client.submit(agg_func, L[i], L[i + 1])
but you will have to figure out a version of your aggregation function which can work pairwise to produce a grand result. Presumably, this would result in a rather large number of futures in-play on the scheduler, which may cause additional latency, so profile to see what works well!
Upvotes: 2
Reputation: 1464
I think you could probably do this within a worker:
>>> def f():
... client = get_client()
... futures = client.map(lambda x: x + 1, range(10)) # spawn many tasks
... results = client.gather(futures)
... return sum(results)
>>> future = client.submit(f)
>>> future.result()
This example is lifted directly from the docs on get_client
Upvotes: 1