Reputation: 39
Machine: 48 cores, 96 threads, RAM 256GB
System: Ubuntu 20.04
Python: 3.9
I have a python script for some data processing and analysis and an input dataset containing around 40,000 files for the job.
The script could be run by python a.py -i sample_list.txt -o /path/to/outdir
.
The sample_list.txt
contains the file prefixes of all 40000 files in that dataset. Each prefix represented a sample id. In python, it is imported as a list
. /path/to/outdir
define the output directory. The software will create a new folder in output directory based on the prefixes first, then the generated data will be put in these new folders sample-by-sample.
I found that this script is analyzing data one by one. I estimated the time, it needs nearly 240 days to finish all the jobs for this dataset! It is unacceptable. I think parallelization for job submission could speed up. That's when snakemake comes into my sight.
I did some study on snakemake. In my case, I can provide three things for snakemake file:
input: "sample_list.txt"
output: "/path/to/outdir"
shell: "python a.py -i {input} -o {output}"
But I have a question:
If I provided a sample_list file as input, the script will read the file prefix, instead of checking input file pattern e.g. {input}_1.txt
and import it as input directly. Is it possible to parallelize the jobs based on sample_list.txt
? Or I must define the input file pattern for input
of snakemake?
Thanks
Additional:
I will print an example for you.
The filename looks like: sample1_1.fq
, sample1_2.fq
, sample2_1.fq
, sample2_2.fq
, etc.
The software requires a list: name = ['sample1','sample2','sample3','sample4']
To get all the sample names (file prefix) I extracted the sample name and stored them in sample_list.txt
:
sample1
sample2
sample3
sample4
How to parallelize the jobs?
Edited:
def multiprocess(data_path, out_dir, run_name):
max_cores = multiprocessing.cpu_count() - 2
pool = multiprocessing.Pool(processes = max_cores) #, maxtasksperchild = 4
sample_list = []
for i in os.listdir(data_path):
#print(i)
portion = os.path.splitext(i)
tmp_out = portion[0][0:portion[0].rfind('_')]
sample_list.append(tmp_out)
sample_list = sorted(list(set(sample_list)))
for sample in sample_list:
pool.apply_async(main, args=(data_path, out_dir, run_name, [sample]))
print('Wait for all subprocesses done...')
pool.close()
pool.join()
print('All subprocesses done!')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description = "Run single sample")
parser.add_argument('-d','--data',action = 'store', dest = 'data', help = "Data path",required = True)
parser.add_argument('-o','--outdir',action = 'store', dest = 'outdir', help = "Output directory",required = True)
parser.add_argument('-r','--runname',action = 'store', dest = 'runname', help = "Define the group",required = True)
args = parser.parse_args()
multiprocess(data_path = args.data,out_dir = args.outdir, run_name = args.runname)
I loaded 11 samples. It quickly caused RAM leakage. Is there a way to improve this?
Upvotes: 1
Views: 435
Reputation: 3368
You don't necessarily need snakemake to achieve a parallelization.
If you wish to use snakemake, you need to define a rule that will process only one sample. The rule you describe will treat all samples at the same time, and thus will be run once without any parallelization.
The snakemake way:
with open("sample_list.txt", 'r') as f:
names = f.read().splitlines()
rule all:
input: expand("/path/to/outdir/{sample}/{sample}_processedFile.txt",sample=names)
rule process:
input: fastq1 = "/path/to/fastq/{sample}_1.fq",
fastq2 = "/path/to/fastq/{sample}_2.fq"
output: "/path/to/outdir/{sample}/{sample}_processedFile.txt"
params: outdir = "/path/to/outdir/{sample}"
shell: "python a.py -i {input.fastq1} {input.fastq2} -o {params.outdir}"
This assumes:
a.py
takes as arguments a pair of fastq file, or a sample name if the script knows how to get the fastq files with it.a.py
processes only one sample at the time.And you would run snakemake this way:
snakemake -j X
where X is the number of parallel jobs (do not exceed the number of cores on the machine)
Remember that all python code outside of rules is executed before building the DAG. However, I'm not completely sure snakemake will be able to build its DAG for 40k files...
The python way:
You can use the python module multiprocessing:
import multiprocessing
def processSample(sample):
# process your sample
# get number of maximum cores on the machine (-1 or more not to overload)
maxCores = multiprocessing.cpu_count() - 1
# build a pool of jobs
pool = multiprocessing.Pool(processes=maxCores)
# add jobs to the pool
for sample in names:
# process each job
pool.apply_async(processSample, args=(sample))
pool.close()
pool.join()
Upvotes: 2