Paul
Paul

Reputation: 696

Parallelizing python script with a python wrapper

I have a python script heavy_lifting.py that I have parallelized using GNU Parallel called from a bash wrapper script wrapper.sh. I use this to process fastq formatted files see example.fastq below. While this works, it is inelegant to require the use of two interpreters and sets of dependencies. I would like to rewrite the bash wrapper script using python while achieving the same parallelization.

example.fastq This is an example of an input file that needs to be processed. This input file is often very long (~500,000,000) lines.

@SRR6750041.1 1/1
CTGGANAAGTGAAATAATATAAATTTTTCCACTATTGAATAAAAGCAACTTAAATTTTCTAAGTCG
+
AAAAA#EEEEEEEEEEEEEEEEEEEEEEEAEEEEEEEEEEEEEEEEEEEEEEEEEA<AAEEEEE<6
@SRR6750041.2 2/1
CTATANTATTCTATATTTATTCTAGATAAAAGCATTCTATATTTAGCATATGTCTAGCAAAAAAAA
+
AAAAA#EE6EEEEEEEEEEEEAAEEAEEEEEEEEEEEE/EAE/EAE/EA/EAEAAAE//EEAEAA6
@SRR6750041.3 3/1
ATCCANAATGATGTGTTGCTCTGGAGGTACAGAGATAACGTCAGCTGGAATAGTTTCCCCTCACAG
+
AAAAA#EE6E6EEEEEE6EEEEAEEEEEEEEEEE//EAEEEEEAAEAEEEAE/EAEEA6/EEA<E/
@SRR6750041.4 4/1
ACACCNAATGCTCTGGCCTCTCAAGCACGTGGATTATGCCAGAGAGGCCAGAGCATTCTTCGTACA
+
/AAAA#EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEAE/E/<//AEA/EA//E//

Below are minimal reproducible examples of the scripts I am starting out with.

heavy_lifting.py

#!/usr/bin/env python
import argparse

# Read in arguments
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--inputFastq', required=True, help='forward .fastq')
parser.add_argument('-o', '--outputFastq', required=True, help='output .fastq')
args = parser.parse_args()

# Iterate through input file and append to output file
with open(args.inputFastq, "r") as infile:
    with open(args.outputFastq, "a") as outfile:
    for line in infile:
        outfile.write("modified" + line)

wrapper.sh

#!/bin/bash

NUMCORES="4"
FASTQ_F="./fastq_F.fastq"

# split the input fastq for parallel processing. One split fastq file will be created for     each core available.
split --number="l/$NUMCORES" $FASTQ_F split_fastq_F_

# Feed split fastq files to GNU Parallel to invoke parallel executions of `heavy_lifting.py`
ls split_fastq_F* | awk -F "split_fastq_F" '{print $2}' | parallel "python  heavy_lifting.py -i split_fastq_F{} -o output.fastq"

#remove intermediate split fastq files
rm split_fastq_*

To execute these scripts I use the command bash wrapper.sh. You can see that a results file output.fastq is created and contains a modified fastq file.

Below is my attempt to invoke parallel processing using a python wrapper wrapper.py.

wrapper.py

#!/usr/bin/env python

import heavy_lifting
from joblib import Parallel, delayed
import multiprocessing

numcores = 4
fastq_F = "fastq_F.fastq"

#Create some logic to split the input fastq file into chunks for parallel processing.  

# Get input fastq file dimensions
with open(fastq_F, "r") as infile:
    length_fastq = len(infile.readlines())
    print(length_fastq)
    lines = infile.readlines()
    split_size = length_fastq / numcores
    print(split_size)

# Iterate through input fastq file writing lines to outfile in bins.
counter = 0
split_counter = 0
split_fastq_list = []
with open(fastq_F, "r") as infile:
    for line in infile:
        if counter == 0:
            filename = str("./split_fastq_F_" + str(split_counter))
            split_fastq_list.append(filename)
            outfile = open(filename, "a")
            counter += 1
        elif counter <= split_size:
            outfile.write(line.strip())
            counter += 1
        else:
            counter = 0
            split_counter += 1
            outfile.close()


Parallel(n_jobs=numcores)(delayed(heavy_lifting)(i, "output.fastq") for i in split_fastq_list)

EDITED to improve reproducibility of wrapper.py

I seem to be be most confused about how to properly feed the input arguments into the invocation of "Parallel" in the python wrapper.py script. Any help is much appreciated!

Upvotes: 0

Views: 514

Answers (4)

Markoyee
Markoyee

Reputation: 67

This article might be helpful. Proposed solution:

import concurrent.futures
import os
from functools import wraps

def make_parallel(func):
    """
        Decorator used to decorate any function which needs to be parallelized.
        After the input of the function should be a list in which each element is a instance of input for the normal function.
        You can also pass in keyword arguments separately.
        :param func: function
            The instance of the function that needs to be parallelized.
        :return: function
    """

    @wraps(func)
    def wrapper(lst):
        """

        :param lst:
            The inputs of the function in a list.
        :return:
        """
        # the number of threads that can be max-spawned.
        # If the number of threads are too high, then the overhead of creating the threads will be significant.
        # Here we are choosing the number of CPUs available in the system and then multiplying it with a constant.
        # In my system, i have a total of 8 CPUs so i will be generating a maximum of 16 threads in my system.
        number_of_threads_multiple = 2 # You can change this multiple according to you requirement
        number_of_workers = int(os.cpu_count() * number_of_threads_multiple)
        if len(lst) < number_of_workers:
            # If the length of the list is low, we would only require those many number of threads.
            # Here we are avoiding creating unnecessary threads
            number_of_workers = len(lst)

        if number_of_workers:
            if number_of_workers == 1:
                # If the length of the list that needs to be parallelized is 1, there is no point in
                # parallelizing the function.
                # So we run it serially.
                result = [func(lst[0])]
            else:
                # Core Code, where we are creating max number of threads and running the decorated function in parallel.
                result = []
                with concurrent.futures.ThreadPoolExecutor(max_workers=number_of_workers) as executor:
                    bag = {executer.submit(func, i): i for i in lst}
                    for future in concurrent.futures.as_completed(bag):
                        result.append(future.result())
        else:
            result = []
        return result
    return wrapper 

example of usage:

# Paralleized way of calling the function
results = make_parallel(sample_function)(list_of_post_ids)

Upvotes: 0

Ole Tange
Ole Tange

Reputation: 33685

This should be a comment, because it does not answer the question, but it is too big.

All of wrapper.sh can be written as:

parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart --cat "python  heavy_lifting.py -i {} -o output.fastq"

If heavy_lifting.py only reads the file and does not seek, this should work, too, and will require less disk I/O (the temporary file is replaced with a fifo):

parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart --fifo "python  heavy_lifting.py -i {} -o output.fastq"

It will autodetect the number of CPU threads, split the fastq-file at a line that start with @SRR, split it into one chunk per CPU thread on the fly and give that to python.

If heavy_lifting.py reads from stdin when no -i is given, then this should work, too:

parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart "python heavy_lifting.py -o output.fastq"

If heavy_lifting.py does not append a unique string to output.fastq, it will be overwritten. So it might be better to have GNU Parallel give it a unique name like output2.fastq:

parallel -a ./fastq_F.fastq --recstart @SRR --block -1 --pipepart "python heavy_lifting.py -o output{#}.fastq"

For a more general FASTQ parallel wrapper see: https://stackoverflow.com/a/41707920/363028

Upvotes: 1

Paul
Paul

Reputation: 696

For reproducibility I implemented the answer provided by furas into the heavy_lifting.py and wrapper.py scripts. Additional edits were needed to make the code run which is why I am providing the following.

heavy_lifting.py

#!/usr/bin/env python
import argparse

# Read in arguments
#parser = argparse.ArgumentParser()
#parser.add_argument('-i', '--inputFastq', required=True, help='forward .fastq')
#parser.add_argument('-o', '--outputFastq', required=True, help='output .fastq')
#args = parser.parse_args()

def heavy_lifting_fun(inputFastq, outputFastq):
    # Iterate through input file and append to output file
    outfile = open(outputFastq, "a")
    with open(inputFastq, "r") as infile:
        for line in infile:
            outfile.write("modified" + line.strip() + "\n")
    outfile.close()

if __name__ == '__main__':
heavy_lifting_fun()

wrapper.py

#!/usr/bin/env python

import heavy_lifting
from joblib import Parallel, delayed
import multiprocessing

numcores = 4
fastq_F = "fastq_F.fastq"

#Create some logic to split the input fastq file into chunks for parallel processing.  

# Get input fastq file dimensions
with open(fastq_F, "r") as infile:
    length_fastq = len(infile.readlines())
    print(length_fastq)
    lines = infile.readlines()
    split_size = length_fastq / numcores
    while (split_size  % 4 != 0):
        split_size += 1
    print(split_size)

# Iterate through input fastq file writing lines to outfile in bins.
counter = 0
split_counter = 0
split_fastq_list = []
with open(fastq_F, "r") as infile:
    for line in infile:
        print(counter)
        #if counter == 0 and line[0] != "@":
        #    continue
        if counter == 0:
            filename = str("./split_fastq_F_" + str(split_counter))
            split_fastq_list.append(filename)
            outfile = open(filename, "a")
            outfile.write(str(line.strip() + "\n"))
            counter += 1
        elif counter < split_size:
            outfile.write(str(line.strip() + "\n"))
            counter += 1
        else:
            counter = 0
            split_counter += 1
            outfile.close()
            filename = str("./split_fastq_F_" + str(split_counter))
            split_fastq_list.append(filename)
            outfile = open(filename, "a")
            outfile.write(str(line.strip() + "\n"))
            counter += 1
    outfile.close()

Parallel(n_jobs=numcores)(delayed(heavy_lifting.heavy_lifting_fun)(i, "output.fastq") for i in split_fastq_list)

Upvotes: 0

furas
furas

Reputation: 142641

Parallel expects function's name, not file/module name

So in heavy_lifting you have to put code in function (with arguments instead of args)

def my_function(inputFastq, outputFastq):

    with open(inputFastq, "r") as infile:
        with open(outputFastq, "a") as outfile:
            for line in infile:
                outfile.write("modified" + line)

And then you can use

Parallel(n_jobs=numcores)(delayed(heavy_lifting.my_function)(i, "output.fastq") for i in split_fastq_list)

Upvotes: 1

Related Questions