Reputation: 5074
I need to stop\kill all processes when there is any error\exception. I found on StackOwerflow solution to kill all processes using psutil
, but from time to time I have an issue - when psutil
kills child and main processes, new processes may start and code continues to execute.
import psutil
class MyClass:
parent_pid = 0
ids_list = range(300)
def main(self):
self.parent_pid = os.getpid()
pool = multiprocessing.Pool(3)
for osm_id in self.ids_list:
pool.apply_async(self.handle_country_or_region,
kwds=dict(country_id=osm_id),
error_callback=self.kill_proc_tree)
pool.close()
pool.join()
def kill_proc_tree(self, including_parent=True):
parent = psutil.Process(self.parent_pid)
children = parent.children(recursive=True)
for child in children:
child.kill()
psutil.wait_procs(children, timeout=5)
if including_parent:
parent.kill()
parent.wait(5)
def handle_country_or_region(self, country_id=None, queue=None):
pass
# here I do some task
It seems that I need to terminate pool rather than kill processes, but in this case, if I do
pool.close()
pool.terminate()
pool.join()
my terminal stops doing anything, new line is fully empty (i.e without ">>>") and nothing happens.
Ideally I want to have the next flow: If there is any error\exception, stop\kill all code executions and back to interactive prompt in terminal.
Can anyone help me to make it work properly? I use Python 3.5 and Ubuntu 15.10
Upvotes: 8
Views: 11442
Reputation: 5074
The solution is quite simple - put 'killer'-function inside the 'main'.
Full code looks like:
class MyClass:
ids_list = range(300)
def main(self):
pool = multiprocessing.Pool(3)
def kill_pool(err_msg):
print(err_msg)
pool.terminate()
for osm_id in self.ids_list:
pool.apply_async(self.handle_country_or_region,
kwds=dict(country_id=osm_id),
error_callback=kill_pool)
pool.close()
pool.join()
def handle_country_or_region(self, country_id=None, queue=None):
pass # here I do some task
If anyone needs to use queue
, below is extended variant of the code, which shows how to finish queue
in a correct way, that avoids having zombie processes:
import pickle
import os
import multiprocessing
class MyClass:
ids_list = range(300)
folder = os.path.join(os.getcwd(), 'app_geo')
STOP_TOKEN = 'stop queue'
def main(self):
# >>> Queue part shared between processes <<<
manager = multiprocessing.Manager()
remove_id_queue = manager.Queue()
remove_id_process = multiprocessing.Process(target=self.remove_id_from_file,
args=(remove_id_queue,))
remove_id_process.start()
# >>> End of queue part <<<
pool = multiprocessing.Pool(3)
def kill_pool(err_msg):
print(err_msg)
pool.terminate()
for osm_id in self.ids_list:
pool.apply_async(self.handle_country_or_region,
kwds=dict(country_id=osm_id),
error_callback=kill_pool)
pool.close()
pool.join()
# >>> Anti-zombie processes queue part <<<
remove_id_queue.put(self.STOP_TOKEN)
remove_id_process.join()
manager.shutdown()
# >>> End
def handle_country_or_region(self, country_id=None, queue=None):
# here I do some task
queue.put(country_id)
def remove_id_from_file(self, some_queue):
while True:
osm_id = some_queue.get()
if osm_id == self.STOP_TOKEN:
return
self.ids_list.remove(osm_id)
with open(self.folder + '/ids_list.pickle', 'wb') as f:
pickle.dump(self.ids_list, f)
Upvotes: 1