Tomer Smadja
Tomer Smadja

Reputation: 66

RuntimeError: Queue objects should only be shared between processes through inheritance

I'm having some trouble with ProcessPoolExecutor. The following code is trying to find the shortest path in a WikiRace game, it gets 2 titles and navigates between one to another.

Here is my code:

class AsyncSearch:
    def __init__(self, start, end):
        self.start = start
        self.end = end
        # self.manager = multiprocessing.Manager()
        self.q = multiprocessing.Queue()
        # self.q = self.manager.Queue()

    def _add_starting_node_page_to_queue(self):
        start_page = WikiGateway().page(self.start)
        return self._check_page(start_page)

    def _is_direct_path_to_end(self, page):
        return (page.title == self.end) or (page.links.get(self.end) is not None)

    def _add_tasks_to_queue(self, pages):
        for page in pages:
            self.q.put(page)

    def _check_page(self, page):
        global PATH_WAS_FOUND_FLAG
        logger.info('Checking page "{}"'.format(page.title))
        if self._is_direct_path_to_end(page):
            logger.info('##########\n\tFound a path!!!\n##########')
            PATH_WAS_FOUND_FLAG = True
            return True
        else:
            links = page.links
            logger.info("Couldn't find a direct path form \"{}\", "
                        "adding {} pages to the queue.".format(page.title, len(links)))
            self._add_tasks_to_queue(links.values())
            return "Couldn't find a direct path form " + page.title

    def start_search(self):
        global PATH_WAS_FOUND_FLAG
        threads = []
        logger.debug(f'Running with concurrent processes!')
        if self._add_starting_node_page_to_queue() is True:
            return True
        with concurrent.futures.ProcessPoolExecutor(max_workers=AsyncConsts.PROCESSES) as executor:
            threads.append(executor.submit(self._check_page, self.q.get()))

I'm getting the following exception:

Traceback (most recent call last):
  File "c:\users\tomer smadja\appdata\local\programs\python\python36-32\lib\multiprocessing\queues.py", line 241, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "c:\users\tomer smadja\appdata\local\programs\python\python36-32\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "c:\users\tomer smadja\appdata\local\programs\python\python36-32\lib\multiprocessing\queues.py", line 58, in __getstate__
    context.assert_spawning(self)
  File "c:\users\tomer smadja\appdata\local\programs\python\python36-32\lib\multiprocessing\context.py", line 356, in assert_spawning
    ' through inheritance' % type(obj).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance

It's weird since I'm using multiprocessing.Queue() that should be shared between the processes as mentioned by the exception.

I found this similar question but couldn't found the answer there.

I tried to use self.q = multiprocessing.Manager().Queue() instead of self.q = multiprocessing.Queue(), I'm not sure if this takes me anywhere but the exception I'm getting is different:

Traceback (most recent call last):
  File "c:\users\tomer smadja\appdata\local\programs\python\python36-32\lib\multiprocessing\queues.py", line 241, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "c:\users\tomer smadja\appdata\local\programs\python\python36-32\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "c:\users\tomer smadja\appdata\local\programs\python\python36-32\lib\multiprocessing\process.py", line 282, in __reduce__
    'Pickling an AuthenticationString object is '
TypeError: Pickling an AuthenticationString object is disallowed for security reasons

Also, when I'm trying to use multiprocessing.Process() instead of ProcessPoolExecutor, I'm unable to finish the process once I do find a path. I set up a global variable to stop PATH_WAS_FOUND_FLAG to stop the process initiation but still with no success. What I'm missing here?

Upvotes: 1

Views: 693

Answers (1)

Ashley
Ashley

Reputation: 618

ProcessPoolExecutor.submit(...) will not pickle multiprocessing.Queue instances as well other shared multiprocessing.* class instances. You can do two things: One is to use SyncManager, or you can initialize the worker with the multiprocessing.Queue instance at ProcessPoolExecutor construction time. Both are shown below.

Following is your original variation with a couple of fixes applied (see note at end)... with this variation, multiprocessing.Queue operations are slightly faster than below SyncManager variation...

global_page_queue = multiprocessing.Queue()
def set_global_queue(q):
    global global_page_queue
    global_page_queue = q

class AsyncSearch:
    def __init__(self, start, end):
        self.start = start
        self.end = end
        #self.q = multiprocessing.Queue()
    ...
    def _add_tasks_to_queue(self, pages):
        for page in pages:
            #self.q.put(page)
            global_page_queue.put(page)

    @staticmethod
    def _check_page(self, page):
        ...

    def start_search(self):
        ...
        print(f'Running with concurrent processes!')
        with concurrent.futures.ProcessPoolExecutor(
                max_workers=5, 
                initializer=set_global_queue, 
                initargs=(global_page_queue,)) as executor:
            f = executor.submit(AsyncSearch._check_page, self, global_page_queue.get())
            r = f.result()
            print(f"result={r}")

Following is SyncManager variation where queue operations are slightly slower than above multiprocessing.Queue variation...

import multiprocessing
import concurrent.futures

class AsyncSearch:
    def __init__(self, start, end):
        self.start = start
        self.end = end
        self.q = multiprocessing.Manager().Queue()
    ...
    @staticmethod
    def _check_page(self, page):
        ...

    def start_search(self):
        global PATH_WAS_FOUND_FLAG
        worker_process_futures = []
        print(f'Running with concurrent processes!')
        with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
            worker_process_futures.append(executor.submit(AsyncSearch._check_page, self, self.q.get()))
            r = worker_process_futures[0].result()
            print(f"result={r}")

Note, for some shared objects, SyncManager can be anywhere from slightly to noticeably slower compared to multiprocessing.* variations. For example, a multiprocessing.Value is in shared memory whereas a SyncManager.Value is in the sync manager processes, requiring overhead to interact with it.

An aside, unrelated to your ask, your original code was passing _check_page with incorrect parameters, where you were passing dequeued item to self, leaving the page parameter None. I resolved this by changing _check_page to a static method and passing self.

Upvotes: 2

Related Questions