Reputation: 18221
I'm running a number of slow tasks on a Dask scheduler, and I want progress reports from each task. The tasks will be submitted from the same machine that will handle progress reports, so that could be kept in the same process, but for now let us just assume that tasks are submitted and progress reports handled in separate processes.
Dask provides Coordination Primitives whose intended use cases include being able to monitor progress:
These can be used to control access to external resources, track progress of ongoing computations, or share data in side-channels between many workers, clients, and tasks sensibly.
The simplest example I've been able to conjure that makes us of this is the following:
Task submitter:
from dask.distributed import Client, Pub
import time
c = Client('tcp://127.0.0.1:8786')
def slow_func():
q = Pub('progress')
for i in range(10):
q.put(f'{i}/10')
time.sleep(1)
c.submit(slow_func)
Task reporter:
from dask.distributed import Client, Sub
c = Client('tcp://127.0.0.1:8786')
q = Sub('progress')
while True:
print(q.get())
This works with Pub
/Sub
but would also work equally well with a Queue
. Now, even though it does work, it seems like it's not what the authors had in mind:
Client
I've used to submit the task when doing the reporting; i.e. the Client
ends up on the worker nodes. This feels strange.So my, admittedly somewhat vague, question is: As far as creating a "Hello world" style example of a Dask future providing progress reports, how would I modify the above to something that could be considered idiomatic Dask, and are there any pitfalls to be aware of?
I can partially get around my first issue by creating a new client for each task (example below), but since I end up with something that appears to work just the same, perhaps doing so is unnecessary.
import time
from dask.distributed import Client, Pub
c_submit = Client('tcp://127.0.0.1:8786')
def slow_func():
c_report = Client('tcp://127.0.0.1:8786')
q = Pub('progress', client=c_report)
for i in range(10):
q.put(f'{i}/10')
time.sleep(1)
c_submit.submit(slow_func)
Upvotes: 1
Views: 469
Reputation: 18221
The first part of the question is answered by the existence of dask.distributed.worker_client
which does exactly what we need: Provide a client talking to the scheduler of the current worker. With that, the task submitter becomes the following:
import time
from dask.distributed import Client, Pub, worker_client
c_submit = Client('tcp://127.0.0.1:8786')
def slow_func():
with worker_client() as c_report:
q = Pub('progress', client=c_report)
for i in range(10):
q.put(f'{i}/10')
time.sleep(1)
c_submit.submit(slow_func)
For the second part, one non-terrible approach would be to simply generate an ID every time the task is submitted. That is, do something like this:
import time
import uuid
from dask.distributed import Client, Pub, worker_client
c_submit = Client('tcp://127.0.0.1:8786')
def slow_func(task_id):
with worker_client() as c_report:
q = Pub('progress', client=c_report)
for i in range(10):
q.put(f'{task_id}: {i}/10')
time.sleep(1)
c_submit.submit(slow_func, uuid.uuid4())
This works and solves my problem, but it still feels a bit weird to use a new ID when the future already has a perfectly usable one in its key.
Upvotes: 1