TitanFighter
TitanFighter

Reputation: 5074

Cant stop\kill all processes at once produced by multiprocessing.Pool

I need to stop\kill all processes when there is any error\exception. I found on StackOwerflow solution to kill all processes using psutil, but from time to time I have an issue - when psutil kills child and main processes, new processes may start and code continues to execute.

import psutil

class MyClass:
    parent_pid = 0
    ids_list = range(300)

    def main(self):
        self.parent_pid = os.getpid()
        pool = multiprocessing.Pool(3)

        for osm_id in self.ids_list:
            pool.apply_async(self.handle_country_or_region,  
                             kwds=dict(country_id=osm_id),
                             error_callback=self.kill_proc_tree)

        pool.close()
        pool.join()

    def kill_proc_tree(self, including_parent=True):
        parent = psutil.Process(self.parent_pid)
        children = parent.children(recursive=True)

        for child in children:
            child.kill()
        psutil.wait_procs(children, timeout=5)

        if including_parent:
            parent.kill()
            parent.wait(5)

    def handle_country_or_region(self, country_id=None, queue=None):
        pass
        # here I do some task

It seems that I need to terminate pool rather than kill processes, but in this case, if I do

pool.close()
pool.terminate()
pool.join()

my terminal stops doing anything, new line is fully empty (i.e without ">>>") and nothing happens.

Ideally I want to have the next flow: If there is any error\exception, stop\kill all code executions and back to interactive prompt in terminal.

Can anyone help me to make it work properly? I use Python 3.5 and Ubuntu 15.10

Upvotes: 8

Views: 11442

Answers (1)

TitanFighter
TitanFighter

Reputation: 5074

The solution is quite simple - put 'killer'-function inside the 'main'.

Full code looks like:

class MyClass:
    ids_list = range(300)

    def main(self):
        pool = multiprocessing.Pool(3)

        def kill_pool(err_msg):
            print(err_msg)
            pool.terminate()

        for osm_id in self.ids_list:
            pool.apply_async(self.handle_country_or_region,     
                             kwds=dict(country_id=osm_id),
                             error_callback=kill_pool)

        pool.close()
        pool.join()

    def handle_country_or_region(self, country_id=None, queue=None):
        pass  # here I do some task

If anyone needs to use queue, below is extended variant of the code, which shows how to finish queue in a correct way, that avoids having zombie processes:

import pickle
import os
import multiprocessing

class MyClass:
    ids_list = range(300)
    folder = os.path.join(os.getcwd(), 'app_geo')
    STOP_TOKEN = 'stop queue'

    def main(self):

        # >>> Queue part shared between processes <<<
        manager = multiprocessing.Manager()
        remove_id_queue = manager.Queue()

        remove_id_process = multiprocessing.Process(target=self.remove_id_from_file,
                                                    args=(remove_id_queue,))
        remove_id_process.start()
        # >>> End of queue part <<<

        pool = multiprocessing.Pool(3)

        def kill_pool(err_msg):
            print(err_msg)
            pool.terminate()

        for osm_id in self.ids_list:
            pool.apply_async(self.handle_country_or_region,     
                             kwds=dict(country_id=osm_id),
                             error_callback=kill_pool)

        pool.close()
        pool.join()

        # >>> Anti-zombie processes queue part <<<
        remove_id_queue.put(self.STOP_TOKEN)
        remove_id_process.join()
        manager.shutdown()
        # >>> End

    def handle_country_or_region(self, country_id=None, queue=None):
        # here I do some task
        queue.put(country_id)

    def remove_id_from_file(self, some_queue):
        while True:
            osm_id = some_queue.get()
            if osm_id == self.STOP_TOKEN:
                return
            self.ids_list.remove(osm_id)
            with open(self.folder + '/ids_list.pickle', 'wb') as f:
                pickle.dump(self.ids_list, f)

Upvotes: 1

Related Questions