Reputation: 4327
My computations with dask.distributed include creation of intermediate files whose names include UUID4, that identify that chunk of work.
pairs = '{}\n{}\n{}\n{}'.format(list1, list2, list3, ...)
file_path = os.path.join(job_output_root, 'pairs',
'pairs-{}.txt'.format(str(uuid.uuid4()).replace('-', '')))
file(file_path, 'wt').writelines(pairs)
In the same time, all tasks in the dask distributed cluster have unique keys. Therefore, it would be natural to use that key ID for file name.
Is it possible?
Upvotes: 2
Views: 1188
Reputation: 57251
There are two ways to approach the problem:
Functions like .submit
accept a key=
keyword argument where you can specify the key that you want used
>>> e.submit(inc, 1, key='inc-12345')
<Future: status: pending, key: inc-12345>
Similarly dask.delayed functions support a dask_key_name
keyword argument
>>> value = delayed(inc)(1, dask_key_name='inc-12345')
The scheduler places contextual information like this into a per-thread global during the execution of each task. As of Version 1.13 this is available as follows:
def your_function(...):
from distributed.worker import thread_state
key = thread_state.key
future = e.submit(your_function, ...)
Upvotes: 2