Reputation: 117
I am trying to run a simple parallel program on a SLURM cluster (4x raspberry Pi 3) but I have no success. I have been reading about it, but I just cannot get it to work. The problem is as follows:
I have a Python program named remove_duplicates_in_scraped_data.py. This program is executed on a single node (node=1xraspberry pi) and inside the program there is a multiprocessing loop section that looks something like:
pool = multiprocessing.Pool()
input_iter= product(FeaturesArray_1, FeaturesArray_2, repeat=1)
results = pool.starmap(refact_featureMatch, input_iter)
The idea is that when it hits that part of the program it should distribute the calculations, one thread per element in the iterator and combine the results in the end. So, the program remove_duplicates_in_scraped_data.py runs once (not multiple times) and it spawns different threads during the pool calculation.
On a single machine (without using SLURM) it works just fine, and for the particular case of a raspberry pi, it spawns 4 threads, does the calcuations, saves it in results and continues the progarm as a single thread.
I would like to exploit all the 16 threads of the SLURM cluster but I cannot seem to get it to work. And I am confident that the cluster has been configured correctly, since it can run all the multiprocessing examples (e.g. calculate the digits of pi) using SLURM in all 16 threads of the cluster.
Now, looking at the SLURM configuration with sinfo -N -l
we have:
NODELIST NODES PARTITION STATE CPUS S:C:T MEMORY TMP_DISK WEIGHT AVAIL_FE REASON
node01 1 picluster* idle 4 4:1:1 1 0 1 (null) none
node02 1 picluster* idle 4 4:1:1 1 0 1 (null) none
node03 1 picluster* idle 4 4:1:1 1 0 1 (null) none
node04 1 picluster* idle 4 4:1:1 1 0 1 (null) none
Each cluster reports 4 sockets, 1 Core and 1 Thread and as far as SLURM is concerned 4 CPUs.
I wish to exploit all the 16 CPUs and if I run my progam as:
srun -N 4 -n 16 python3 remove_duplicates_in_scraped_data.py
It will just run 4 copies of the main progam in each node, resulting in 16 threads. But this is not what I want. I want a single instance of the program, which then spawns the 16 threads across the cluster. At least we know that with srun -N -n 16 the cluster works.
So, I tried instead changing the program as follows:
#!/usr/bin/python3
#SBATCH -p picluster
#SBATCH --nodes=4
#SBATCH --ntasks=16
#SBATCH --cpus-per-task=1
#SBATCH --ntasks-per-node=4
#SBATCH --ntasks-per-socket=1
#SBATCH --sockets-per-node=4
sys.path.append(os.getcwd())
...
...
...
pool = multiprocessing.Pool()
input_iter= product(FeaturesArray_1, FeaturesArray_2, repeat=1)
results = pool.starmap(refact_featureMatch, input_iter)
...
...
and executing it with
sbatch remove_duplicates_in_scraped_data.py
The slurm job is created successfully and I see that all nodes have been allocated on the cluster
PARTITION AVAIL TIMELIMIT NODES STATE NODELIST
picluster* up infinite 4 alloc node[01-04]
The program starts running as a single thread on node01 but when it hits the parallel part it only spawns 4 threads on node01 and nothing on all the other nodes.
I tried different combination of settings, even tried to run it via a script
#!/bin/bash
#SBATCH -p picluster
#SBATCH --nodes=4
#SBATCH --ntasks=16
#SBATCH --cpus-per-task=1
#SBATCH --ntasks-per-node=4
#SBATCH --ntasks-per-socket=1
#SBATCH --ntasks-per-core=1
#SBATCH --sockets-per-node=4
python3 remove_duplicates_in_scraped_data.py
but I just cannot get it to spawn on the other nodes.
Can you please help me? Is it even possible to do this? i.e. use python's multiprocessing pool on different nodes of a cluster? If not, what other options do I have? The cluster also has dask configured. Would that be able to work better?
Please help as I am really stuck with this.
Thanks
Upvotes: 5
Views: 3053
Reputation: 117
So, instead I run DASK with the SLURM cluster and the Python script sweems to parallelise well. This required the least amount of code changes. So the above multiprocessing pool code was changed to:
cluster = SLURMCluster( header_skip=['--mem'],
queue='picluster',
cores=4,
memory='1GB'
)
cluster.scale(cores=16) #the number of nodes to request
dask_client = Client(cluster)
lazy_results=[]
for pair in input_iter:
res = dask_client.submit(refact_featureMatch, pair[0], pair[1])
lazy_results.append(res)
results = dask_client.gather(lazy_results)
There might be of course better ways of doing this via DASK. I am open to suggestions :)
Upvotes: 2
Reputation: 1685
Pythons multiprocessing package is limited to shared memory parallelization. It spawns new processes that all have access to the main memory of a single machine.
You cannot simply scale out such a software onto multiple nodes. As the different machines do not have a shared memory that they can access.
To run your program on multiple nodes at once, you should have a look into MPI (Message Passing Interface). There is also a python package for that.
Depending on your task, it may also be suitable to run the program 4 times (so one job per node) and have it work on a subset of the data. It is often the simpler approach, but not always possible.
Upvotes: 4