Reputation: 268
I saw somewhere a hint on how to process a large dataset (say lines of text) faster with the multiprocessing module, something like:
... (form batch_set = nump batches [= lists of lines to process], batch_set
is a list of lists of strings (batches))
nump = len(batch_set)
output = mp.Queue()
processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i])) for i in range(nump)]
for p in processes:
p.start()
for p in processes:
p.join()
results = sorted([output.get() for p in processes])
... (do something with the processed outputs, ex print them in order,
given that each proc_lines function returns a couple (i, out_batch))
However, when i run the code with a small number of lines/batch it works fine [ex: './code.py -x 4:10' for nump=4 and numb=10 (lines/batch)] while after a certain number of lines/batch is hangs [ex: './code.py -x 4:4000'] and when i interrupt it i see a traceback hint about a _wait_for_tstate_lock and the system threading library. It seems that the code does not reach the last shown code line above...
I provide the code below, in case somebody needs it to answer why this is happening and how to fix it.
#!/usr/bin/env python3
import sys
import multiprocessing as mp
def fabl(numb, nump):
'''
Form And Batch Lines: form nump[roc] groups of numb[atch] indexed lines
'<idx> my line here' with indexes from 1 to (nump x numb).
'''
ret = []
idx = 1
for _ in range(nump):
cb = []
for _ in range(numb):
cb.append('%07d my line here' % idx)
idx += 1
ret.append(cb)
return ret
def proc_lines(i, output, rows_in):
ret = []
for row in rows_in:
row = row[0:8] + 'some other stuff\n' # replacement for the post-idx part
ret.append(row)
output.put((i,ret))
return
def mp_proc(batch_set):
'given the batch, disperse it to the number of processes and ret the results'
nump = len(batch_set)
output = mp.Queue()
processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i])) for i in range(nump)]
for p in processes:
p.start()
for p in processes:
p.join()
print('waiting for procs to complete...')
results = sorted([output.get() for p in processes])
return results
def write_set(proc_batch_set, fout):
'write p[rocessed]batch_set'
for _, out_batch in proc_batch_set:
for row in out_batch:
fout.write(row)
return
def main():
args = sys.argv
if len(args) < 2:
print('''
run with args: -x [ NumProc:BatchSize ]
( ex: '-x' | '-x 4:10' (default values) | '-x 4:4000' (hangs...) )
''')
sys.exit(0)
numb = 10 # suppose we need this number of lines/batch : BatchSize
nump = 4 # number of processes to use. : NumProcs
if len(args) > 2 and ':' in args[2]: # use another np:bs
nump, numb = map(int, args[2].split(':'))
batch_set = fabl(numb, nump) # proc-batch made in here: nump (groups) x numb (lines)
proc_batch_set = mp_proc(batch_set)
with open('out-min', 'wt') as fout:
write_set(proc_batch_set, fout)
return
if __name__ == '__main__':
main()
Upvotes: 1
Views: 125
Reputation: 4467
The Queue
have a certain capacity and can get full if you do not empty it while the Process
are running. This does not block the execution of your processes but you won't be able to join the Process
if the put
did not complete.
So I would just modify the mp_proc
function such that:
def mp_proc(batch_set):
'given the batch, disperse it to the number of processes and ret the results'
n_process = len(batch_set)
output = mp.Queue()
processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i]))
for i in range(process)]
for p in processes:
p.start()
# Empty the queue while the processes are running so there is no
# issue with uncomplete `put` operations.
results = sorted([output.get() for p in processes])
# Join the process to make sure everything finished correctly
for p in processes:
p.join()
return results
Upvotes: 1