solora
solora

Reputation: 93

python multiprocessing start and close processes independently

I am trying to run inference with tensorflow using multiprocessing. Each process uses 1 GPU. I have a list of files input_files[]. Every process gets one file, runs the model.predict on it and writes the results to file. To move on to next file, I need to close the process and restart it. This is because tensorflow doesn't let go of memory. So if I use the same process, I get memory leak.

I have written a code below which works. I start 5 processes, close them and start another 5. The issue is that all processes need to wait for the slowest one before they can move on. How can I start and close each process independent of the others?

Note that Pool.map is over input_files_small not input_files.

file1 --> start new process --> run prediction --> close process --> file2 --> start new process --> etc.


for i in range(0, len(input_files), num_process):
    input_files_small = input_files[i:i+num_process]
    try:
        process_pool = multiprocessing.Pool(processes=num_process, initializer=init_worker, initargs=(gpu_ids))
        pool_output = process_pool.map(worker_fn, input_files_small)
    finally:
        process_pool.close()
        process_pool.join()

Upvotes: 0

Views: 360

Answers (1)

Booboo
Booboo

Reputation: 44213

There is no need to re-create over and over the processing pool. First, specify maxtasksperchild=1 when creating the pool. This should result in creating a new process for each new task submitted. And instead of using method map, use method map_async, which will not block. You can use pool.close followed by pool.join() to wait for these submissions to complete implicitly if your worker function does not return results you need, as follows or use the second code variation:

process_pool = multiprocessing.Pool(processes=num_process, initializer=init_worker, initargs=(gpu_ids), maxtasksperchild=1)
for i in range(0, len(input_files), num_process):
    input_files_small = input_files[i:i+num_process]
    process_pool.map_async(worker_fn, input_files_small))
# wait for all outstanding tasks to complete
process_pool.close()
process_pool.join()

If you need return values from worker_fn:

process_pool = multiprocessing.Pool(processes=num_process, initializer=init_worker, initargs=(gpu_ids), maxtasksperchild=1)
results = []
for i in range(0, len(input_files), num_process):
    input_files_small = input_files[i:i+num_process]
    results.append(process_pool.map_async(worker_fn, input_files_small))
# get return values from map_async
pool_outputs = [result.get() for result in results]
# you do not need process_pool.close() and process_pool.join()

But, since there may be some "slow" tasks still running from an earlier invocation of map_async when tasks from a later invocation of map_async start up, some of these tasks may still have to wait to run. But at least all of your processes in the pool should stay fairly busy.

If you are expecting exceptions from your worker function and need to handle them in your main process, it gets more complicated.

Upvotes: 1

Related Questions