fuglede
fuglede

Reputation: 18221

Reporting progress in a Dask task

I'm running a number of slow tasks on a Dask scheduler, and I want progress reports from each task. The tasks will be submitted from the same machine that will handle progress reports, so that could be kept in the same process, but for now let us just assume that tasks are submitted and progress reports handled in separate processes.

Dask provides Coordination Primitives whose intended use cases include being able to monitor progress:

These can be used to control access to external resources, track progress of ongoing computations, or share data in side-channels between many workers, clients, and tasks sensibly.

The simplest example I've been able to conjure that makes us of this is the following:

Task submitter:

from dask.distributed import Client, Pub
import time

c = Client('tcp://127.0.0.1:8786')

def slow_func():
    q = Pub('progress')
    for i in range(10):
        q.put(f'{i}/10')
        time.sleep(1)

c.submit(slow_func)

Task reporter:

from dask.distributed import Client, Sub

c = Client('tcp://127.0.0.1:8786')
q = Sub('progress')
while True:
    print(q.get())

This works with Pub/Sub but would also work equally well with a Queue. Now, even though it does work, it seems like it's not what the authors had in mind:

So my, admittedly somewhat vague, question is: As far as creating a "Hello world" style example of a Dask future providing progress reports, how would I modify the above to something that could be considered idiomatic Dask, and are there any pitfalls to be aware of?

I can partially get around my first issue by creating a new client for each task (example below), but since I end up with something that appears to work just the same, perhaps doing so is unnecessary.

import time
from dask.distributed import Client, Pub

c_submit = Client('tcp://127.0.0.1:8786')

def slow_func():
    c_report = Client('tcp://127.0.0.1:8786')
    q = Pub('progress', client=c_report)
    for i in range(10):
        q.put(f'{i}/10')
        time.sleep(1)

c_submit.submit(slow_func)

Upvotes: 1

Views: 469

Answers (1)

fuglede
fuglede

Reputation: 18221

The first part of the question is answered by the existence of dask.distributed.worker_client which does exactly what we need: Provide a client talking to the scheduler of the current worker. With that, the task submitter becomes the following:

import time
from dask.distributed import Client, Pub, worker_client

c_submit = Client('tcp://127.0.0.1:8786')

def slow_func():
    with worker_client() as c_report:
        q = Pub('progress', client=c_report)
        for i in range(10):
            q.put(f'{i}/10')
            time.sleep(1)
c_submit.submit(slow_func)

For the second part, one non-terrible approach would be to simply generate an ID every time the task is submitted. That is, do something like this:

import time
import uuid

from dask.distributed import Client, Pub, worker_client

c_submit = Client('tcp://127.0.0.1:8786')

def slow_func(task_id):
    with worker_client() as c_report:
        q = Pub('progress', client=c_report)
        for i in range(10):
            q.put(f'{task_id}: {i}/10')
            time.sleep(1)

c_submit.submit(slow_func, uuid.uuid4())

This works and solves my problem, but it still feels a bit weird to use a new ID when the future already has a perfectly usable one in its key.

Upvotes: 1

Related Questions