Reputation: 745
Samples records in the data file (SAM file):
M01383 0 chr4 66439384 255 31M * 0 0 AAGAGGA GFAFHGD MD:Z:31 NM:i:0
M01382 0 chr1 241995435 255 31M * 0 0 ATCCAAG AFHTTAG MD:Z:31 NM:i:0
......
I need to go through the record in the data file line by line, get a particular value (e.g. 4th value, 66439384) from each line, and pass this value to another function for processing. Then some results counter will be updated.
the basic workflow is like this:
# global variable, counters will be updated in search function according to the value passed.
counter_a = 0
counter_b = 0
counter_c = 0
open textfile:
for line in textfile:
value = line.split()[3]
search_function(value) # this function takes abit long time to process
def search_function (value):
some conditions checking:
update the counter_a or counter_b or counter_c
With single process code and about 1.5G data file, it took about 20 hours to run through all the records in one data file. I need much faster code because there are more than 30 of this kind data file.
I was thinking to process the data file in N chunks in parallel, and each chunk will perform above workflow and update the global variable (counter_a, counter_b, counter_c) simultaneously. But I don't know how to achieve this in code, or wether this will work.
I have access to a server machine with: 24 processors and around 40G RAM.
Anyone could help with this? Thanks very much.
Upvotes: 7
Views: 13444
Reputation: 56624
This solution works on a set of files.
For each file, it divides it into a specified number of line-aligned chunks, solves each chunk in parallel, then combines the results.
It streams each chunk from disk; this is somewhat slower, but does not consume nearly so much memory. We depend on disk cache and buffered reads to prevent head thrashing.
Usage is like
python script.py -n 16 sam1.txt sam2.txt sam3.txt
and script.py
is
import argparse
from io import SEEK_END
import multiprocessing as mp
#
# Worker process
#
def summarize(fname, start, stop):
"""
Process file[start:stop]
start and stop both point to first char of a line (or EOF)
"""
a = 0
b = 0
c = 0
with open(fname, newline='') as inf:
# jump to start position
pos = start
inf.seek(pos)
for line in inf:
value = int(line.split(4)[3])
# *** START EDIT HERE ***
#
# update a, b, c based on value
#
# *** END EDIT HERE ***
pos += len(line)
if pos >= stop:
break
return a, b, c
def main(num_workers, sam_files):
print("{} workers".format(num_workers))
pool = mp.Pool(processes=num_workers)
# for each input file
for fname in sam_files:
print("Dividing {}".format(fname))
# decide how to divide up the file
with open(fname) as inf:
# get file length
inf.seek(0, SEEK_END)
f_len = inf.tell()
# find break-points
starts = [0]
for n in range(1, num_workers):
# jump to approximate break-point
inf.seek(n * f_len // num_workers)
# find start of next full line
inf.readline()
# store offset
starts.append(inf.tell())
# do it!
stops = starts[1:] + [f_len]
start_stops = zip(starts, stops)
print("Solving {}".format(fname))
results = [pool.apply(summarize, args=(fname, start, stop)) for start,stop in start_stops]
# collect results
results = [sum(col) for col in zip(*results)]
print(results)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Parallel text processor')
parser.add_argument('--num_workers', '-n', default=8, type=int)
parser.add_argument('sam_files', nargs='+')
args = parser.parse_args()
main(args.num_workers, args.sam_files)
main(args.num_workers, args.sam_files)
Upvotes: 1
Reputation: 414079
Read one file at a time, use all CPUs to run search_function()
:
#!/usr/bin/env python
from multiprocessing import Array, Pool
def init(counters_): # called for each child process
global counters
counters = counters_
def search_function (value): # assume it is CPU-intensive task
some conditions checking:
update the counter_a or counter_b or counter_c
counter[0] += 1 # counter 'a'
counter[1] += 1 # counter 'b'
return value, result, error
if __name__ == '__main__':
counters = Array('i', [0]*3)
pool = Pool(initializer=init, initargs=[counters])
values = (line.split()[3] for line in textfile)
for value, result, error in pool.imap_unordered(search_function, values,
chunksize=1000):
if error is not None:
print('value: {value}, error: {error}'.format(**vars()))
pool.close()
pool.join()
print(list(counters))
Make sure (for example, by writing wrappers) that exceptions do not escape next(values)
, search_function()
.
Upvotes: 1
Reputation: 95326
What you don't want to do is hand files to invidual CPUs. If that's the case, the file open/reads will likely cause the heads to bounce randomly all over the disk, because the files are likely to be all over the disk.
Instead, break each file into chunks and process the chunks.
Open the file with one CPU. Read in the whole thing into an array Text. You want to do this is one massive read to prevent the heads from thrashing around the disk, under the assumption that your file(s) are placed on the disk in relatively large sequential chunks.
Divide its size in bytes by N, giving a (global) value K, the approximate number of bytes each CPU should process. Fork N threads, and hand each thread i its index i, and a copied handle for each file.
Each thread i starts a thread-local scan pointer p into Text as offset i*K. It scans the text, incrementing p and ignores the text until a newline is found. At this point, it starts processing lines (increment p as it scans the lines). Tt stops after processing a line, when its index into the Text file is greater than (i+1)*K.
If the amount of work per line is about equal, your N cores will all finish about the same time.
(If you have more than one file, you can then start the next one).
If you know that the file sizes are smaller than memory, you might arrange the file reads to be pipelined, e.g., while the current file is being processed, a file-read thread is reading the next file.
Upvotes: 0
Reputation: 2914
The simplest approach would probably be to do all 30 files at once with your existing code -- would still take all day, but you'd have all the files done at once. (ie, "9 babies in 9 months" is easy, "1 baby in 1 month" is hard)
If you really want to get a single file done faster, it will depend on how your counters actually update. If almost all the work is just in analysing value you can offload that using the multiprocessing module:
import time
import multiprocessing
def slowfunc(value):
time.sleep(0.01)
return value**2 + 0.3*value + 1
counter_a = counter_b = counter_c = 0
def add_to_counter(res):
global counter_a, counter_b, counter_c
counter_a += res
counter_b -= (res - 10)**2
counter_c += (int(res) % 2)
pool = multiprocessing.Pool(50)
results = []
for value in range(100000):
r = pool.apply_async(slowfunc, [value])
results.append(r)
# don't let the queue grow too long
if len(results) == 1000:
results[0].wait()
while results and results[0].ready():
r = results.pop(0)
add_to_counter(r.get())
for r in results:
r.wait()
add_to_counter(r.get())
print counter_a, counter_b, counter_c
That will allow 50 slowfuncs to run in parallel, so instead of taking 1000s (=100k*0.01s), it takes 20s (100k/50)*0.01s to complete. If you can restructure your function into "slowfunc" and "add_to_counter" like the above, you should be able to get a factor of 24 speedup.
Upvotes: 4