Reputation: 51
I am trying to distribute work across multiple processes. Exceptions that occur in another process should be propagated back and handled in the main process. This seems to work for exceptions thrown in worker, but not for Ctrl-C
.
import time
from concurrent.futures import ProcessPoolExecutor, Future, wait
import traceback
def worker():
time.sleep(5)
# raise RuntimeError("Some Error")
return True
def main():
with ProcessPoolExecutor() as executor:
stop = False
tasks = set()
while True:
try:
# keep submitting new tasks
if not stop:
time.sleep(0.1)
print("Submitting task to worker")
future = executor.submit(worker)
tasks.add(future)
done, not_done = wait(tasks, timeout=0)
tasks = not_done
# get the result
for future in done:
try:
result = future.result()
except Exception as e:
print(f"Exception in worker: {type(e).__name__}: {e}")
else:
print(f"Worker finished with result: {result}")
# exit loop if there are no tasks left and loop was stopped
if stop:
print(f"Waiting for {len(tasks)} to finish.")
if not len(tasks):
print("Finished all remaining tasks")
break
time.sleep(1)
except KeyboardInterrupt:
print("Recieved Ctrl-C")
stop = True
except Exception as e:
print(f"Caught {e}")
stop = True
if __name__ == "__main__":
main()
Some of my observations:
I suspect that if Ctrl-C is pressed while a process is being spawned it could lead to some unexpected behaviour.
Edit: These problems occur on both Linux and Windows. Ideally there is a solution for both, but in case of doubt the solution should work on Linux
Upvotes: 3
Views: 372
Reputation: 44283
It wasn't clear to me whether you want your worker
function to continue running until completion (normal or abnormal) ignoring any Ctrl-C events. Assuming that to be the case, the following code should work under both Linux and Windows.
The idea is to use a "pool initializer", i.e. a function that will run in each pool process prior to executing submitted tasks. Here the initializer executes code to ignore Ctrl-C events (KeyboardInterrupt
exceptions).
Note that I have made a few other code adjustments (marked with comments).
import time
from concurrent.futures import ProcessPoolExecutor, Future, wait
import traceback
import signal
def init_pool_processes():
# Ignore Ctrl-C
signal.signal(signal.SIGINT, signal.SIG_IGN)
def worker():
time.sleep(5)
# raise RuntimeError("Some Error")
return True
def main():
# Create pool child processes, which will now
# ignore Ctrl-C
with ProcessPoolExecutor(initializer=init_pool_processes) as executor:
stop = False
tasks = set()
while True:
try:
# keep submitting new tasks
if not stop:
print("Submitting task to worker")
future = executor.submit(worker)
tasks.add(future)
# Move to here:
time.sleep(0.1)
done, not_done = wait(tasks, timeout=0)
tasks = not_done
# get the result
for future in done:
try:
result = future.result()
except Exception as e:
print(f"Exception in worker: {type(e).__name__}: {e}")
else:
print(f"Worker finished with result: {result}")
# exit loop if there are no tasks left and loop was stopped
if stop:
print(f"Waiting for {len(tasks)} to finish.")
if not len(tasks):
print("Finished all remaining tasks")
break
time.sleep(1)
except KeyboardInterrupt:
# Ignore future Ctrl-C events:
signal.signal(signal.SIGINT, signal.SIG_IGN)
print("Received Ctrl-C") # spelling
stop = True
except Exception as e:
print(f"Caught {e}")
# Ignore future Ctrl-C events:
if not stop:
signal.signal(signal.SIGINT, signal.SIG_IGN)
stop = True
if __name__ == "__main__":
main()
Upvotes: 1