Reputation: 135
I have the following code that walks through each of project dirs, calls an external executable and writes results to out* files.
from dask_jobqueue import PBSCluster
cluster = PBSCluster()
cluster.scale(jobs=3)
from dask.distributed import Client
client = Client(cluster)
...
r_path='/path/to/project/folder'
def func():
f = open('out', 'w') # In project, customized out_file naming based on different dir's
(subprocess.call(["/path/to/executable/file"], stdout=f))
for root, dirs, files in os.walk("."):
for name in dirs:
os.chdir(r_path+'/'+str(name))
func()
This code executes in a sequential manner but instead I would like to run it in parallel i.e. each subprocess call on one dask worker.
Note: I have same subprocess.call (same executable) for all the different dir's
I have tried this
def func():
f = open('out', 'w') # In project, customized out_file naming based on different dir's
func.file = (subprocess.call(["/path/to/executable/file"], stdout=f))
arg = [func.file]
workers = client.scheduler_info()['workers']
tasks = [client.submit(func, arg, workers=worker) for arg, worker in zip(args, workers)]
and also this ( probably not using dask to distribute/parallelize )
def func():
f = open('out', 'w')
with io.open(f, mode='wb') as out:
p = subprocess.Popen(["/path/to/executable/file"], stdout=out, stderr=out)
child_processes.append(p)
for cp in child_processes:
cp.wait()
but couldn't parallelise/distribute the subprocess calls.
Could someone please help me in parallelising this subprocess calls for 1 worker each so that things can be executed faster.
Thanks in advance !
Upvotes: 2
Views: 1473
Reputation: 28673
Often, the first non-dask attempt shows the easiest pattern to parallelise. However, I would caution against using global state with os.chdir
- instead, refer to the output file by full path, and pass the working directory to the subprocess
r_path='/path/to/project/folder'
def func(path):
f = open(os.path.join(path, 'out'), 'w')
subprocess.call(["/path/to/executable/file"], stdout=f, cwd=path)
out = []
for root, dirs, files in os.walk("."):
for name in dirs:
path = r_path+'/'+str(name)
out.append(dask.delayed(func)(path))
dask.compute(*out)
Upvotes: 4