vuvu
vuvu

Reputation: 268

python3 multiprocessing.Process approach fails

I saw somewhere a hint on how to process a large dataset (say lines of text) faster with the multiprocessing module, something like:

... (form batch_set = nump batches [= lists of lines to process], batch_set
     is a list of lists of strings (batches))
nump = len(batch_set)
output = mp.Queue()
processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i])) for i in range(nump)]

for p in processes:
    p.start()
for p in processes:
    p.join()

results = sorted([output.get() for p in processes])
... (do something with the processed outputs, ex print them in order,
    given that each proc_lines function returns a couple (i, out_batch))

However, when i run the code with a small number of lines/batch it works fine [ex: './code.py -x 4:10' for nump=4 and numb=10 (lines/batch)] while after a certain number of lines/batch is hangs [ex: './code.py -x 4:4000'] and when i interrupt it i see a traceback hint about a _wait_for_tstate_lock and the system threading library. It seems that the code does not reach the last shown code line above...

I provide the code below, in case somebody needs it to answer why this is happening and how to fix it.

#!/usr/bin/env python3

import sys
import multiprocessing as mp


def fabl(numb, nump):
    '''
    Form And Batch Lines: form nump[roc] groups of numb[atch] indexed lines
    '<idx> my line here' with indexes from 1 to (nump x numb).
    '''
    ret = []
    idx = 1
    for _ in range(nump):
        cb = []
        for _ in range(numb):
            cb.append('%07d my line here' % idx)
            idx += 1
        ret.append(cb)
    return ret


def proc_lines(i, output, rows_in):
    ret = []
    for row in rows_in:
        row = row[0:8] + 'some other stuff\n' # replacement for the post-idx part
        ret.append(row)

    output.put((i,ret))
    return


def mp_proc(batch_set):
    'given the batch, disperse it to the number of processes and ret the results'
    nump = len(batch_set)
    output = mp.Queue()
    processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i])) for i in range(nump)]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print('waiting for procs to complete...')
    results = sorted([output.get() for p in processes])
    return results


def write_set(proc_batch_set, fout):
    'write p[rocessed]batch_set'
    for _, out_batch in proc_batch_set:
        for row in out_batch:
            fout.write(row)
    return


def main():
    args = sys.argv
    if len(args) < 2:
        print('''
    run with args: -x [ NumProc:BatchSize ]
        ( ex: '-x' | '-x 4:10' (default values) | '-x 4:4000' (hangs...) )
        ''')
        sys.exit(0)

    numb = 10  # suppose we need this number of lines/batch : BatchSize
    nump = 4   # number of processes to use.                : NumProcs
    if len(args) > 2 and ':' in args[2]: # use another np:bs
        nump, numb = map(int, args[2].split(':'))

    batch_set = fabl(numb, nump)  # proc-batch made in here: nump (groups) x numb (lines)
    proc_batch_set = mp_proc(batch_set)

    with open('out-min', 'wt') as fout:
        write_set(proc_batch_set, fout)

    return

if __name__ == '__main__':
    main()

Upvotes: 1

Views: 125

Answers (1)

Thomas Moreau
Thomas Moreau

Reputation: 4467

The Queue have a certain capacity and can get full if you do not empty it while the Process are running. This does not block the execution of your processes but you won't be able to join the Process if the put did not complete.

So I would just modify the mp_proc function such that:

def mp_proc(batch_set):
    'given the batch, disperse it to the number of processes and ret the results'
    n_process = len(batch_set)
    output = mp.Queue()
    processes = [mp.Process(target=proc_lines, args=(i, output, batch_set[i]))
                 for i in range(process)]

    for p in processes:
        p.start()

    # Empty the queue while the processes are running so there is no
    # issue with uncomplete `put` operations.
    results = sorted([output.get() for p in processes])

    # Join the process to make sure everything finished correctly
    for p in processes:
        p.join()

    return results

Upvotes: 1

Related Questions