Reputation: 396
The following example doesn't work, unless asynchronous
keyword is not used in Localcluster
. I would like to control how many processes/workers are used and process functions in parallel and print the results as they are ready. What needs to be changed?
import time
from dask.distributed import Client, LocalCluster, as_completed
def wait(sec):
time.sleep(sec)
return sec
def main():
cluster = LocalCluster(n_workers=2, ncores=2, asynchronous=True)
inputs = [5, 7, 3, 1]
client = Client(cluster)
futures = client.map(wait, inputs)
for future, result in as_completed(futures, with_results=True):
print(result)
client.close()
if __name__ == '__main__':
main()
Upvotes: 1
Views: 594
Reputation: 57281
As you suggest, you should remove the asynchronous=
keyword from your LocalCluster call. This keyword is used to support async functions, like the following:
async def main():
cluster = await LocalCluster(n_workers=2, ncores=2, asynchronous=True)
inputs = [5, 7, 3, 1]
client = await Client(cluster, asynchronous=True)
futures = client.map(wait, inputs)
async for future, result in as_completed(futures, with_results=True):
print(result)
await client.close()
If you don't want to use async-await syntax (which is relatively rare) then you should ignore the asynchronous= keyword. It probably doesn't do what you think it does.
Upvotes: 1