Sebastian
Sebastian

Reputation: 396

Dask async processing, print results as they arrive

The following example doesn't work, unless asynchronous keyword is not used in Localcluster. I would like to control how many processes/workers are used and process functions in parallel and print the results as they are ready. What needs to be changed?

import time
from dask.distributed import Client, LocalCluster, as_completed


def wait(sec):
    time.sleep(sec)
    return sec


def main():

    cluster = LocalCluster(n_workers=2, ncores=2, asynchronous=True)
    inputs = [5, 7, 3, 1]
    client = Client(cluster)
    futures = client.map(wait, inputs)

    for future, result in as_completed(futures, with_results=True):
        print(result)

    client.close()


if __name__ == '__main__':
    main()

Upvotes: 1

Views: 594

Answers (1)

MRocklin
MRocklin

Reputation: 57281

As you suggest, you should remove the asynchronous= keyword from your LocalCluster call. This keyword is used to support async functions, like the following:

async def main():
    cluster = await LocalCluster(n_workers=2, ncores=2, asynchronous=True)
    inputs = [5, 7, 3, 1]
    client = await Client(cluster, asynchronous=True)
    futures = client.map(wait, inputs)

    async for future, result in as_completed(futures, with_results=True):
        print(result)

    await client.close()

If you don't want to use async-await syntax (which is relatively rare) then you should ignore the asynchronous= keyword. It probably doesn't do what you think it does.

Upvotes: 1

Related Questions