Reputation: 634
In the following code, I'm trying to create a sandboxed master-worker system, in which changes to global variables in a worker don't reflect to other workers.
To achieve this, a new process is created each time a task is created, and to make the execution parallel, the creation of processes itself is managed by ThreadPoolExecutor
.
import time
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Pipe, Process
def task(conn, arg):
conn.send(arg * 2)
def isolate_fn(fn, arg):
def wrapped():
parent_conn, child_conn = Pipe()
p = Process(target=fn, args=(child_conn, arg), daemon=True)
try:
p.start()
r = parent_conn.recv()
finally:
p.join()
return r
return wrapped
def main():
with ThreadPoolExecutor(max_workers=4) as executor:
pair = []
for i in range(0, 10):
pair.append((i, executor.submit(isolate_fn(task, i))))
# This function makes the program broken.
#
print('foo')
time.sleep(2)
for arg, future in pair:
if future.done():
print('arg: {}, res: {}'.format(arg, future.result()))
else:
print('not finished: {}'.format(arg))
print('finished')
main()
This program works fine, until I put the print('foo')
function inside the loop. If the function exists, some tasks remain unfinished, and what is worse, this program itself doesn't finish.
Results are not always the same, but the following is the typical output:
foo
foo
foo
foo
foo
foo
foo
foo
foo
foo
arg: 0, res: 0
arg: 1, res: 2
arg: 2, res: 4
not finished: 3
not finished: 4
not finished: 5
not finished: 6
not finished: 7
not finished: 8
not finished: 9
Why is this program so fragile?
I use Python 3.4.5.
Upvotes: 4
Views: 242
Reputation: 116
Try using
from multiprocessing import set_start_method
... rest of your code here ....
if __name__ == '__main__':
set_start_method('spawn')
main()
If you search Stackoverflow for python multiprocessing and multithreading you will find a a fair few questions mentioning similar hanging issues. (esp. for python version 2.7 and 3.2)
Mixing multithreading and multiprocessing ist still a bit of an issue and even the python docs for multiprocessing.set_start_method mention that. In your case 'spawn' and 'forkserver' should work without any issues.
Another option might be to use MultiProcessingPool directly, but this may not be possible for you in a more complex use case.
Btw. 'Not Finished' may still appear in your output, as you are not waiting for your sub processes to finish, but the whole code should not hang anymore and always finish cleanly.
Upvotes: 1
Reputation: 1
You are not creating ThreadPoolExecutor every time , rather using the pre initialized pool for every iteration. I really not able to track which print statement is hindering you?
Upvotes: 0