Xiangwu
Xiangwu

Reputation: 745

Python, process a large text file in parallel

Samples records in the data file (SAM file):

M01383  0  chr4  66439384  255  31M  *  0  0  AAGAGGA GFAFHGD  MD:Z:31 NM:i:0
M01382  0  chr1  241995435  255 31M  *  0  0  ATCCAAG AFHTTAG  MD:Z:31 NM:i:0
......

I need to go through the record in the data file line by line, get a particular value (e.g. 4th value, 66439384) from each line, and pass this value to another function for processing. Then some results counter will be updated.

the basic workflow is like this:

# global variable, counters will be updated in search function according to the value passed. 
counter_a = 0    
counter_b = 0
counter_c = 0

open textfile:
    for line in textfile:
        value = line.split()[3]
        search_function(value)    # this function takes abit long time to process

def search_function (value):
    some conditions checking:
        update the counter_a or counter_b or counter_c

With single process code and about 1.5G data file, it took about 20 hours to run through all the records in one data file. I need much faster code because there are more than 30 of this kind data file.

I was thinking to process the data file in N chunks in parallel, and each chunk will perform above workflow and update the global variable (counter_a, counter_b, counter_c) simultaneously. But I don't know how to achieve this in code, or wether this will work.

I have access to a server machine with: 24 processors and around 40G RAM.

Anyone could help with this? Thanks very much.

Upvotes: 7

Views: 13444

Answers (4)

Hugh Bothwell
Hugh Bothwell

Reputation: 56624

This solution works on a set of files.

For each file, it divides it into a specified number of line-aligned chunks, solves each chunk in parallel, then combines the results.

It streams each chunk from disk; this is somewhat slower, but does not consume nearly so much memory. We depend on disk cache and buffered reads to prevent head thrashing.

Usage is like

python script.py -n 16 sam1.txt sam2.txt sam3.txt

and script.py is

import argparse
from io import SEEK_END 
import multiprocessing as mp

#
# Worker process
#
def summarize(fname, start, stop):
    """
    Process file[start:stop]

    start and stop both point to first char of a line (or EOF)
    """
    a = 0
    b = 0
    c = 0

    with open(fname, newline='') as inf:
        # jump to start position
        pos = start
        inf.seek(pos)

        for line in inf:
            value = int(line.split(4)[3])

            # *** START EDIT HERE ***
            #

            # update a, b, c based on value

            #
            # *** END EDIT HERE ***

            pos += len(line)
            if pos >= stop:
                break

    return a, b, c

def main(num_workers, sam_files):
    print("{} workers".format(num_workers))
    pool = mp.Pool(processes=num_workers)

    # for each input file
    for fname in sam_files:
        print("Dividing {}".format(fname))
        # decide how to divide up the file
        with open(fname) as inf:
            # get file length
            inf.seek(0, SEEK_END)
            f_len = inf.tell()
            # find break-points
            starts = [0]
            for n in range(1, num_workers):
                # jump to approximate break-point
                inf.seek(n * f_len // num_workers)
                # find start of next full line
                inf.readline()
                # store offset
                starts.append(inf.tell())

        # do it!
        stops = starts[1:] + [f_len]
        start_stops =  zip(starts, stops)
        print("Solving {}".format(fname))
        results = [pool.apply(summarize, args=(fname, start, stop)) for start,stop in start_stops]

        # collect results
        results = [sum(col) for col in zip(*results)]
        print(results)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Parallel text processor')
    parser.add_argument('--num_workers', '-n', default=8, type=int)
    parser.add_argument('sam_files', nargs='+')
    args = parser.parse_args()
    main(args.num_workers, args.sam_files)
    main(args.num_workers, args.sam_files)

Upvotes: 1

jfs
jfs

Reputation: 414079

Read one file at a time, use all CPUs to run search_function():

#!/usr/bin/env python
from multiprocessing import Array, Pool

def init(counters_): # called for each child process
    global counters
    counters = counters_

def search_function (value): # assume it is CPU-intensive task
    some conditions checking:
        update the counter_a or counter_b or counter_c
        counter[0] += 1 # counter 'a'
        counter[1] += 1 # counter 'b'
    return value, result, error

if __name__ == '__main__':
    counters = Array('i', [0]*3)
    pool = Pool(initializer=init, initargs=[counters])
    values = (line.split()[3] for line in textfile)
    for value, result, error in pool.imap_unordered(search_function, values,
                                                    chunksize=1000):
        if error is not None:
            print('value: {value}, error: {error}'.format(**vars()))
    pool.close()
    pool.join()
    print(list(counters))

Make sure (for example, by writing wrappers) that exceptions do not escape next(values), search_function().

Upvotes: 1

Ira Baxter
Ira Baxter

Reputation: 95326

What you don't want to do is hand files to invidual CPUs. If that's the case, the file open/reads will likely cause the heads to bounce randomly all over the disk, because the files are likely to be all over the disk.

Instead, break each file into chunks and process the chunks.

Open the file with one CPU. Read in the whole thing into an array Text. You want to do this is one massive read to prevent the heads from thrashing around the disk, under the assumption that your file(s) are placed on the disk in relatively large sequential chunks.

Divide its size in bytes by N, giving a (global) value K, the approximate number of bytes each CPU should process. Fork N threads, and hand each thread i its index i, and a copied handle for each file.

Each thread i starts a thread-local scan pointer p into Text as offset i*K. It scans the text, incrementing p and ignores the text until a newline is found. At this point, it starts processing lines (increment p as it scans the lines). Tt stops after processing a line, when its index into the Text file is greater than (i+1)*K.

If the amount of work per line is about equal, your N cores will all finish about the same time.

(If you have more than one file, you can then start the next one).

If you know that the file sizes are smaller than memory, you might arrange the file reads to be pipelined, e.g., while the current file is being processed, a file-read thread is reading the next file.

Upvotes: 0

Anthony Towns
Anthony Towns

Reputation: 2914

The simplest approach would probably be to do all 30 files at once with your existing code -- would still take all day, but you'd have all the files done at once. (ie, "9 babies in 9 months" is easy, "1 baby in 1 month" is hard)

If you really want to get a single file done faster, it will depend on how your counters actually update. If almost all the work is just in analysing value you can offload that using the multiprocessing module:

import time
import multiprocessing

def slowfunc(value):
    time.sleep(0.01)
    return value**2 + 0.3*value + 1

counter_a = counter_b = counter_c = 0
def add_to_counter(res):
    global counter_a, counter_b, counter_c
    counter_a += res
    counter_b -= (res - 10)**2
    counter_c += (int(res) % 2)

pool = multiprocessing.Pool(50)
results = []

for value in range(100000):
    r = pool.apply_async(slowfunc, [value])
    results.append(r)

    # don't let the queue grow too long
    if len(results) == 1000:
        results[0].wait()

    while results and results[0].ready():
        r = results.pop(0)
        add_to_counter(r.get())

for r in results:
    r.wait()
    add_to_counter(r.get())

print counter_a, counter_b, counter_c

That will allow 50 slowfuncs to run in parallel, so instead of taking 1000s (=100k*0.01s), it takes 20s (100k/50)*0.01s to complete. If you can restructure your function into "slowfunc" and "add_to_counter" like the above, you should be able to get a factor of 24 speedup.

Upvotes: 4

Related Questions