Reputation: 122
I have a simplified program in python 3.6, which runs multiple threads. Each thread then spawns a process pool and run a job in parallel.
Now for some reason, the code works fine on Windows, but after several cycles, hangs on Linux. This could be due to Linux using fork for creating new processes, instead of spawn. The behavior can be changed, but spawning the processes is too slow for my needs.
Here is the code:
import time
import random
from threading import Thread
from multiprocessing import Pool
NUM_PROCESSES = 2
NUM_TRIALS = 3
random.seed(42)
class TestPool:
def process(self):
opt_profiles = ['P1', 'P2', 'P3', 'P4']
input_data = [1, 2, 3]
for _ in range(100):
threads = []
try:
for opt_profile in opt_profiles:
thread = self.process_async(opt_profile, input_data)
threads.append(thread)
finally:
print('Waiting for threads to be finished')
for thread in threads:
thread.join()
print('Threads are finished')
def process_async(self, opt_profile, input_data):
thread = Thread(target=self._process_asynch_pool, args=[opt_profile, input_data])
thread.start()
return thread
def _process_asynch_pool(self, opt_profile, input_data):
print(f'Processing profile: {opt_profile}, data: {input_data}')
p = Pool(NUM_PROCESSES)
print(f'Running profile: {opt_profile}')
processed_data = p.map(self._process_asynch_data, input_data)
print(f'Closing profile: {opt_profile}')
p.close()
print(f'Joining profile: {opt_profile}')
p.join()
print(f'Processes have joined.')
def _process_asynch_data(self, input_data):
print(f'Received data: {input_data}')
result = input_data * 10
time.sleep(1)
return result
if __name__ == '__main__':
pool = TestPool()
pool.process()
The code hangs on the thread.join()
line, but the logs indicate that every process has finished its job.
Edit: In addition to the original system (CentOS, Python3.6), I can reproduce the issue on Ubuntu (WSL) with Python3.8.
Upvotes: 2
Views: 981
Reputation: 44223
The docs for a multiprocessing.pool.Pool
instance states:
Warning:
multiprocessing.pool
objects have internal resources that need to be properly managed (like any other resource) by using the pool as a context manager or by calling close() and terminate() manually. Failure to do this can lead to the process hanging on finalization.Note that is not correct to rely on the garbage colletor to destroy the pool as CPython does not assure that the finalizer of the pool will be called (see
object.__del__()
for more information).
I agree with the other commenters I would expect the code to run as is. But you never call terminate
either explicitly or implicitly (which would be the case if you were using the pool instance as a context manager) and you are re-creating pools multiple times. One thing to try is to add a call to pool.terminate()
instead of or in addition to the call to pool.join()
and see if that resolves the issue.
Another thing worth trying for diagnostic purposes (besides it would be more efficient) is to create a single multiprocessing pool at the outset ideally large enough to handle all the tasks that will be submitted to it for each trial iteration. Since the worker function _process_asynch_data
is mostly waiting because of the call to time.sleep
, there is no problem in creating a multiprocessing pool larger than the number of cores you have; it is actually desirable. You will also have to pass the multiprocessing pool to your threads as an argument. For example:
import time
from threading import Thread
from multiprocessing import Pool
NUM_TRIALS = 100
SLEEP_TIME = 1
class TestPool:
def process(self):
opt_profiles = ['P1', 'P2', 'P3', 'P4']
input_data = [1, 2, 3]
tasks_per_trial = len(opt_profiles) * len(input_data)
with Pool(tasks_per_trial) as pool:
for _ in range(NUM_TRIALS):
threads = []
try:
for opt_profile in opt_profiles:
thread = self.process_async(pool, opt_profile, input_data)
threads.append(thread)
finally:
print('Waiting for threads to be finished')
for thread in threads:
thread.join()
print('Threads are finished')
print(f'Closing profile: {opt_profile}')
print(f'Closing pool.')
pool.close()
print(f'Joining pool.')
pool.join()
print(f'Pool has been joined.')
# implicit pool.terminate() is done here:
def process_async(self, pool, opt_profile, input_data):
thread = Thread(target=self._process_asynch_pool, args=[pool, opt_profile, input_data])
thread.start()
return thread
def _process_asynch_pool(self, pool, opt_profile, input_data):
print(f'Processing profile: {opt_profile}, data: {input_data}')
print(f'Running profile: {opt_profile}')
processed_data = pool.map(self._process_asynch_data, input_data)
def _process_asynch_data(self, input_data):
print(f'Received data: {input_data}')
result = input_data * 10
time.sleep(SLEEP_TIME)
return result
if __name__ == '__main__':
pool = TestPool()
pool.process()
Upvotes: 1