Reputation: 913
I am building a simple example to understand how dask distributed can distribute python scripts on a HPC cluster. The method to distribute is a basic operation, that writes a file on disk. This script works fine when run from command line ($python simple-function.py)
import os
import argparse
import time
def inc(x):
time.sleep(1)
return x + 1
def get_args():
"""
Get args
"""
parser = argparse.ArgumentParser()
parser.add_argument("x", help="Value")
args = parser.parse_args()
x = args.x
return int(x)
if __name__ == "__main__":
x = get_args()
print("{0} + 1 = {1}".format(x, inc(x)))
with open("results.txt", 'w') as file:
file.write(str(x) + '\n')
Now, I have created another python script that will distribute this code. The idea is to use subprocess and client.map or client.submit to launch multiple instances of the above script. The problem I run into is that the output .txt file is not written when using any of the methods below (client.map, client.submit then gather or compute or .result()). Maybe I'm not using the correct method ?
import os
import time
import subprocess
import yaml
import argparse
import dask
from dask.distributed import Client
from dask_jobqueue import PBSCluster
def create_cluster():
cluster = PBSCluster(
cores=4,
memory="20GB",
interface="ib0",
queue="qdev",
processes=4,
nanny=True,
walltime="12:00:00",
shebang="#!/bin/bash",
local_directory="$TMPDIR"
)
cluster.scale(4)
time.sleep(10) # Wait for workers
return cluster
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("script", help="Script to distribute")
parser.add_argument("nodes", type=int, help="Number of nodes")
args = parser.parse_args()
script = args.script
n_nodes = args.nodes
return script, n_nodes
def close_all(client, cluster):
client.close()
cluster.close()
def methode(script, x):
subprocess.run(["python",
script,
x])
return None
if __name__ == "__main__":
cluster = create_cluster()
client = Client(cluster)
time.sleep(1)
script, n_nodes = get_args() #Get arguments
#With client.submit
futures = []
for n, o in enumerate(range(10)):
futures.append(client.submit(methode, *[script, str(o)], priority=-n))
[f.result() for f in futures]
#Or client.map
L = client.map(methode, *[script, str(range(10))])
client.compute(L)
client.gather(L)
time.sleep(20)
close_all(client, cluster)
On a side note if I execute the following code : dask.compute(methode(args))
Then, the .txt output file is written.
It seems that only the different client methods are not working
Upvotes: 0
Views: 943
Reputation: 615
So tried what you have with some minor changes and it pretty much worked for me. I took out the HPC stuff because I figured the question was with dask not dask-jobqueue and focused on getting just the futures working. Then I noticed that your simple-function.py clobbers all the results, so I just changed that. The output is results0.txt - results9.txt in my current directory (not the home where the scripts are located).
distribute.py:
import os
import time
import subprocess
import yaml
import argparse
import dask
from dask.distributed import Client
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("script", help="Script to distribute")
parser.add_argument("nodes", type=int, help="Number of nodes")
args = parser.parse_args()
script = args.script
n_nodes = args.nodes
return script, n_nodes
def methode(script, x):
subprocess.run(["python",
script,
x])
return None
if __name__ == "__main__":
client = Client()
script, n_nodes = get_args() #Get arguments
futures = []
for n, o in enumerate(range(10)):
futures.append(client.submit(methode, *[script, str(o)], priority=-n))
results = client.gather(futures)
client.close()
simple-function.py:
import os
import argparse
import time
def inc(x):
time.sleep(1)
return x + 1
def get_args():
"""
Get args
"""
parser = argparse.ArgumentParser()
parser.add_argument("x", help="Value")
args = parser.parse_args()
x = args.x
return int(x)
if __name__ == "__main__":
x = get_args()
print("{0} + 1 = {1}".format(x, inc(x)))
with open(f"results{x}.txt", 'w') as file:
file.write(str(x + 1) + '\n')
Edit: Oh I just read your answer, if you execute that code from a jupyter cell it is executed from the directory of the notebook, which is probably your home.
Upvotes: 1
Reputation: 913
I finally found that the path to the script to execute was incorrect.
If the following command is run from the command line :
$python distribute.py simple-function.py 6
Somehow, the path is incorrect and the file is not found.
I have tried to run this into a notebook. If I execute :
methode(*[script, "8"]
The file is written in the current directory. However if I execute this cell :
futures = []
for n, o in enumerate(range(12)):
print(o)
futures.append(client.submit(methode, *[script, str(o)], priority=-n))
wait(futures)
print("Done !")
The result file is not written in the current directory but rather in my home directory.
I am not quite sure yet why this happens but it seems that when using client, something changes in the path ...
Upvotes: 0