Reputation: 66
I'm having some trouble with ProcessPoolExecutor
.
The following code is trying to find the shortest path in a WikiRace game, it gets 2 titles and navigates between one to another.
Here is my code:
class AsyncSearch:
def __init__(self, start, end):
self.start = start
self.end = end
# self.manager = multiprocessing.Manager()
self.q = multiprocessing.Queue()
# self.q = self.manager.Queue()
def _add_starting_node_page_to_queue(self):
start_page = WikiGateway().page(self.start)
return self._check_page(start_page)
def _is_direct_path_to_end(self, page):
return (page.title == self.end) or (page.links.get(self.end) is not None)
def _add_tasks_to_queue(self, pages):
for page in pages:
self.q.put(page)
def _check_page(self, page):
global PATH_WAS_FOUND_FLAG
logger.info('Checking page "{}"'.format(page.title))
if self._is_direct_path_to_end(page):
logger.info('##########\n\tFound a path!!!\n##########')
PATH_WAS_FOUND_FLAG = True
return True
else:
links = page.links
logger.info("Couldn't find a direct path form \"{}\", "
"adding {} pages to the queue.".format(page.title, len(links)))
self._add_tasks_to_queue(links.values())
return "Couldn't find a direct path form " + page.title
def start_search(self):
global PATH_WAS_FOUND_FLAG
threads = []
logger.debug(f'Running with concurrent processes!')
if self._add_starting_node_page_to_queue() is True:
return True
with concurrent.futures.ProcessPoolExecutor(max_workers=AsyncConsts.PROCESSES) as executor:
threads.append(executor.submit(self._check_page, self.q.get()))
I'm getting the following exception:
Traceback (most recent call last):
File "c:\users\tomer smadja\appdata\local\programs\python\python36-32\lib\multiprocessing\queues.py", line 241, in _feed
obj = _ForkingPickler.dumps(obj)
File "c:\users\tomer smadja\appdata\local\programs\python\python36-32\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "c:\users\tomer smadja\appdata\local\programs\python\python36-32\lib\multiprocessing\queues.py", line 58, in __getstate__
context.assert_spawning(self)
File "c:\users\tomer smadja\appdata\local\programs\python\python36-32\lib\multiprocessing\context.py", line 356, in assert_spawning
' through inheritance' % type(obj).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance
It's weird since I'm using multiprocessing.Queue()
that should be shared between the processes as mentioned by the exception.
I found this similar question but couldn't found the answer there.
I tried to use self.q = multiprocessing.Manager().Queue()
instead of self.q = multiprocessing.Queue()
, I'm not sure if this takes me anywhere but the exception I'm getting is different:
Traceback (most recent call last):
File "c:\users\tomer smadja\appdata\local\programs\python\python36-32\lib\multiprocessing\queues.py", line 241, in _feed
obj = _ForkingPickler.dumps(obj)
File "c:\users\tomer smadja\appdata\local\programs\python\python36-32\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "c:\users\tomer smadja\appdata\local\programs\python\python36-32\lib\multiprocessing\process.py", line 282, in __reduce__
'Pickling an AuthenticationString object is '
TypeError: Pickling an AuthenticationString object is disallowed for security reasons
Also, when I'm trying to use multiprocessing.Process()
instead of ProcessPoolExecutor
, I'm unable to finish the process once I do find a path. I set up a global variable to stop PATH_WAS_FOUND_FLAG
to stop the process initiation but still with no success. What I'm missing here?
Upvotes: 1
Views: 693
Reputation: 618
ProcessPoolExecutor.submit(...) will not pickle multiprocessing.Queue instances as well other shared multiprocessing.* class instances. You can do two things: One is to use SyncManager, or you can initialize the worker with the multiprocessing.Queue instance at ProcessPoolExecutor construction time. Both are shown below.
Following is your original variation with a couple of fixes applied (see note at end)... with this variation, multiprocessing.Queue operations are slightly faster than below SyncManager variation...
global_page_queue = multiprocessing.Queue()
def set_global_queue(q):
global global_page_queue
global_page_queue = q
class AsyncSearch:
def __init__(self, start, end):
self.start = start
self.end = end
#self.q = multiprocessing.Queue()
...
def _add_tasks_to_queue(self, pages):
for page in pages:
#self.q.put(page)
global_page_queue.put(page)
@staticmethod
def _check_page(self, page):
...
def start_search(self):
...
print(f'Running with concurrent processes!')
with concurrent.futures.ProcessPoolExecutor(
max_workers=5,
initializer=set_global_queue,
initargs=(global_page_queue,)) as executor:
f = executor.submit(AsyncSearch._check_page, self, global_page_queue.get())
r = f.result()
print(f"result={r}")
Following is SyncManager variation where queue operations are slightly slower than above multiprocessing.Queue variation...
import multiprocessing
import concurrent.futures
class AsyncSearch:
def __init__(self, start, end):
self.start = start
self.end = end
self.q = multiprocessing.Manager().Queue()
...
@staticmethod
def _check_page(self, page):
...
def start_search(self):
global PATH_WAS_FOUND_FLAG
worker_process_futures = []
print(f'Running with concurrent processes!')
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
worker_process_futures.append(executor.submit(AsyncSearch._check_page, self, self.q.get()))
r = worker_process_futures[0].result()
print(f"result={r}")
Note, for some shared objects, SyncManager can be anywhere from slightly to noticeably slower compared to multiprocessing.* variations. For example, a multiprocessing.Value is in shared memory whereas a SyncManager.Value is in the sync manager processes, requiring overhead to interact with it.
An aside, unrelated to your ask, your original code was passing _check_page with incorrect parameters, where you were passing dequeued item to self, leaving the page parameter None. I resolved this by changing _check_page to a static method and passing self.
Upvotes: 2