ranjith
ranjith

Reputation: 135

Parallelize/distribute subprocess call in dask each for one worker?

I have the following code that walks through each of project dirs, calls an external executable and writes results to out* files.

from dask_jobqueue import PBSCluster   
cluster = PBSCluster()
cluster.scale(jobs=3)  

from dask.distributed import Client
client = Client(cluster)
...

r_path='/path/to/project/folder'


def func():
    f = open('out', 'w') # In project, customized out_file naming based on different dir's
    (subprocess.call(["/path/to/executable/file"], stdout=f))

for root, dirs, files in os.walk("."):
    for name in dirs:
        os.chdir(r_path+'/'+str(name))
        func()

This code executes in a sequential manner but instead I would like to run it in parallel i.e. each subprocess call on one dask worker.

Note: I have same subprocess.call (same executable) for all the different dir's

I have tried this

def func():
   f = open('out', 'w') # In project, customized out_file naming based on different dir's
   func.file = (subprocess.call(["/path/to/executable/file"], stdout=f))


arg = [func.file]
workers = client.scheduler_info()['workers']
tasks = [client.submit(func, arg, workers=worker) for arg, worker in zip(args, workers)]

and also this ( probably not using dask to distribute/parallelize )

def func():
    f = open('out', 'w')
    with io.open(f, mode='wb') as out:
        p = subprocess.Popen(["/path/to/executable/file"], stdout=out, stderr=out)
        child_processes.append(p)

for cp in child_processes:
    cp.wait()

but couldn't parallelise/distribute the subprocess calls.

Could someone please help me in parallelising this subprocess calls for 1 worker each so that things can be executed faster.

Thanks in advance !

Upvotes: 2

Views: 1473

Answers (1)

mdurant
mdurant

Reputation: 28673

Often, the first non-dask attempt shows the easiest pattern to parallelise. However, I would caution against using global state with os.chdir - instead, refer to the output file by full path, and pass the working directory to the subprocess

r_path='/path/to/project/folder'

def func(path):
    f = open(os.path.join(path, 'out'), 'w')
    subprocess.call(["/path/to/executable/file"], stdout=f, cwd=path)

out = []
for root, dirs, files in os.walk("."):
    for name in dirs:
        path = r_path+'/'+str(name)
        out.append(dask.delayed(func)(path))

dask.compute(*out)

Upvotes: 4

Related Questions