Reputation: 696
I have a python script heavy_lifting.py
that I have parallelized using GNU Parallel called from a bash wrapper script wrapper.sh
. I use this to process fastq formatted files see example.fastq
below. While this works, it is inelegant to require the use of two interpreters and sets of dependencies. I would like to rewrite the bash wrapper script using python while achieving the same parallelization.
example.fastq
This is an example of an input file that needs to be processed. This input file is often very long (~500,000,000) lines.
@SRR6750041.1 1/1
CTGGANAAGTGAAATAATATAAATTTTTCCACTATTGAATAAAAGCAACTTAAATTTTCTAAGTCG
+
AAAAA#EEEEEEEEEEEEEEEEEEEEEEEAEEEEEEEEEEEEEEEEEEEEEEEEEA<AAEEEEE<6
@SRR6750041.2 2/1
CTATANTATTCTATATTTATTCTAGATAAAAGCATTCTATATTTAGCATATGTCTAGCAAAAAAAA
+
AAAAA#EE6EEEEEEEEEEEEAAEEAEEEEEEEEEEEE/EAE/EAE/EA/EAEAAAE//EEAEAA6
@SRR6750041.3 3/1
ATCCANAATGATGTGTTGCTCTGGAGGTACAGAGATAACGTCAGCTGGAATAGTTTCCCCTCACAG
+
AAAAA#EE6E6EEEEEE6EEEEAEEEEEEEEEEE//EAEEEEEAAEAEEEAE/EAEEA6/EEA<E/
@SRR6750041.4 4/1
ACACCNAATGCTCTGGCCTCTCAAGCACGTGGATTATGCCAGAGAGGCCAGAGCATTCTTCGTACA
+
/AAAA#EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEAE/E/<//AEA/EA//E//
Below are minimal reproducible examples of the scripts I am starting out with.
heavy_lifting.py
#!/usr/bin/env python
import argparse
# Read in arguments
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--inputFastq', required=True, help='forward .fastq')
parser.add_argument('-o', '--outputFastq', required=True, help='output .fastq')
args = parser.parse_args()
# Iterate through input file and append to output file
with open(args.inputFastq, "r") as infile:
with open(args.outputFastq, "a") as outfile:
for line in infile:
outfile.write("modified" + line)
wrapper.sh
#!/bin/bash
NUMCORES="4"
FASTQ_F="./fastq_F.fastq"
# split the input fastq for parallel processing. One split fastq file will be created for each core available.
split --number="l/$NUMCORES" $FASTQ_F split_fastq_F_
# Feed split fastq files to GNU Parallel to invoke parallel executions of `heavy_lifting.py`
ls split_fastq_F* | awk -F "split_fastq_F" '{print $2}' | parallel "python heavy_lifting.py -i split_fastq_F{} -o output.fastq"
#remove intermediate split fastq files
rm split_fastq_*
To execute these scripts I use the command bash wrapper.sh
. You can see that a results file output.fastq
is created and contains a modified fastq file.
Below is my attempt to invoke parallel processing using a python wrapper wrapper.py
.
wrapper.py
#!/usr/bin/env python
import heavy_lifting
from joblib import Parallel, delayed
import multiprocessing
numcores = 4
fastq_F = "fastq_F.fastq"
#Create some logic to split the input fastq file into chunks for parallel processing.
# Get input fastq file dimensions
with open(fastq_F, "r") as infile:
length_fastq = len(infile.readlines())
print(length_fastq)
lines = infile.readlines()
split_size = length_fastq / numcores
print(split_size)
# Iterate through input fastq file writing lines to outfile in bins.
counter = 0
split_counter = 0
split_fastq_list = []
with open(fastq_F, "r") as infile:
for line in infile:
if counter == 0:
filename = str("./split_fastq_F_" + str(split_counter))
split_fastq_list.append(filename)
outfile = open(filename, "a")
counter += 1
elif counter <= split_size:
outfile.write(line.strip())
counter += 1
else:
counter = 0
split_counter += 1
outfile.close()
Parallel(n_jobs=numcores)(delayed(heavy_lifting)(i, "output.fastq") for i in split_fastq_list)
I seem to be be most confused about how to properly feed the input arguments into the invocation of "Parallel" in the python wrapper.py script. Any help is much appreciated!
Upvotes: 0
Views: 514
Reputation: 67
This article might be helpful. Proposed solution:
import concurrent.futures
import os
from functools import wraps
def make_parallel(func):
"""
Decorator used to decorate any function which needs to be parallelized.
After the input of the function should be a list in which each element is a instance of input for the normal function.
You can also pass in keyword arguments separately.
:param func: function
The instance of the function that needs to be parallelized.
:return: function
"""
@wraps(func)
def wrapper(lst):
"""
:param lst:
The inputs of the function in a list.
:return:
"""
# the number of threads that can be max-spawned.
# If the number of threads are too high, then the overhead of creating the threads will be significant.
# Here we are choosing the number of CPUs available in the system and then multiplying it with a constant.
# In my system, i have a total of 8 CPUs so i will be generating a maximum of 16 threads in my system.
number_of_threads_multiple = 2 # You can change this multiple according to you requirement
number_of_workers = int(os.cpu_count() * number_of_threads_multiple)
if len(lst) < number_of_workers:
# If the length of the list is low, we would only require those many number of threads.
# Here we are avoiding creating unnecessary threads
number_of_workers = len(lst)
if number_of_workers:
if number_of_workers == 1:
# If the length of the list that needs to be parallelized is 1, there is no point in
# parallelizing the function.
# So we run it serially.
result = [func(lst[0])]
else:
# Core Code, where we are creating max number of threads and running the decorated function in parallel.
result = []
with concurrent.futures.ThreadPoolExecutor(max_workers=number_of_workers) as executor:
bag = {executer.submit(func, i): i for i in lst}
for future in concurrent.futures.as_completed(bag):
result.append(future.result())
else:
result = []
return result
return wrapper
example of usage:
# Paralleized way of calling the function
results = make_parallel(sample_function)(list_of_post_ids)
Upvotes: 0
Reputation: 33685
This should be a comment, because it does not answer the question, but it is too big.
All of wrapper.sh
can be written as:
parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart --cat "python heavy_lifting.py -i {} -o output.fastq"
If heavy_lifting.py
only reads the file and does not seek, this should work, too, and will require less disk I/O (the temporary file is replaced with a fifo):
parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart --fifo "python heavy_lifting.py -i {} -o output.fastq"
It will autodetect the number of CPU threads, split the fastq-file at a line that start with @SRR, split it into one chunk per CPU thread on the fly and give that to python.
If heavy_lifting.py
reads from stdin when no -i
is given, then this should work, too:
parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart "python heavy_lifting.py -o output.fastq"
If heavy_lifting.py
does not append a unique string to output.fastq
, it will be overwritten. So it might be better to have GNU Parallel give it a unique name like output2.fastq
:
parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart "python heavy_lifting.py -o output{#}.fastq"
For a more general FASTQ parallel wrapper see: https://stackoverflow.com/a/41707920/363028
Upvotes: 1
Reputation: 696
For reproducibility I implemented the answer provided by furas into the heavy_lifting.py
and wrapper.py
scripts. Additional edits were needed to make the code run which is why I am providing the following.
heavy_lifting.py
#!/usr/bin/env python
import argparse
# Read in arguments
#parser = argparse.ArgumentParser()
#parser.add_argument('-i', '--inputFastq', required=True, help='forward .fastq')
#parser.add_argument('-o', '--outputFastq', required=True, help='output .fastq')
#args = parser.parse_args()
def heavy_lifting_fun(inputFastq, outputFastq):
# Iterate through input file and append to output file
outfile = open(outputFastq, "a")
with open(inputFastq, "r") as infile:
for line in infile:
outfile.write("modified" + line.strip() + "\n")
outfile.close()
if __name__ == '__main__':
heavy_lifting_fun()
wrapper.py
#!/usr/bin/env python
import heavy_lifting
from joblib import Parallel, delayed
import multiprocessing
numcores = 4
fastq_F = "fastq_F.fastq"
#Create some logic to split the input fastq file into chunks for parallel processing.
# Get input fastq file dimensions
with open(fastq_F, "r") as infile:
length_fastq = len(infile.readlines())
print(length_fastq)
lines = infile.readlines()
split_size = length_fastq / numcores
while (split_size % 4 != 0):
split_size += 1
print(split_size)
# Iterate through input fastq file writing lines to outfile in bins.
counter = 0
split_counter = 0
split_fastq_list = []
with open(fastq_F, "r") as infile:
for line in infile:
print(counter)
#if counter == 0 and line[0] != "@":
# continue
if counter == 0:
filename = str("./split_fastq_F_" + str(split_counter))
split_fastq_list.append(filename)
outfile = open(filename, "a")
outfile.write(str(line.strip() + "\n"))
counter += 1
elif counter < split_size:
outfile.write(str(line.strip() + "\n"))
counter += 1
else:
counter = 0
split_counter += 1
outfile.close()
filename = str("./split_fastq_F_" + str(split_counter))
split_fastq_list.append(filename)
outfile = open(filename, "a")
outfile.write(str(line.strip() + "\n"))
counter += 1
outfile.close()
Parallel(n_jobs=numcores)(delayed(heavy_lifting.heavy_lifting_fun)(i, "output.fastq") for i in split_fastq_list)
Upvotes: 0
Reputation: 142641
Parallel
expects function's name, not file/module name
So in heavy_lifting
you have to put code in function (with arguments instead of args
)
def my_function(inputFastq, outputFastq):
with open(inputFastq, "r") as infile:
with open(outputFastq, "a") as outfile:
for line in infile:
outfile.write("modified" + line)
And then you can use
Parallel(n_jobs=numcores)(delayed(heavy_lifting.my_function)(i, "output.fastq") for i in split_fastq_list)
Upvotes: 1