Fanchen Bao
Fanchen Bao

Reputation: 4279

Why using "fork" works but using "spawn" fails in Python3.8+ `multiprocessing`?

I work on macOS and lately got bitten by the "fork" to "spawn" change in Python 3.8 multiprocessing (see doc). Below shows a simplified working example where using "fork" succeeds but using "spawn" fails. The purpose of the code is to create a custom queue object that supports calling size() under macOS, hence the inheritance from the Queue object and getting multiprocessing's context.

import multiprocessing
from multiprocessing import Process
from multiprocessing.queues import Queue
from time import sleep


class Q(Queue):
    def __init__(self):
        super().__init__(ctx=multiprocessing.get_context())
        self.size = 1

    def call(self):
        return print(self.size)


def foo(q):
    q.call()


if __name__ == '__main__':
    multiprocessing.set_start_method('spawn')  # this would fail
    # multiprocessing.set_start_method('fork')  # this would succeed
    q = Q()
    p = Process(target=foo, args=(q,))
    p.start()
    p.join(timeout=1)

The error message output when using "spawn" is shown below.

Process Process-1:
Traceback (most recent call last):
  File "/usr/local/Cellar/[email protected]/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/local/Cellar/[email protected]/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/fanchen/Private/python_work/sandbox.py", line 23, in foo
    q.call()
  File "/Users/fanchen/Private/python_work/sandbox.py", line 19, in call
    return print(self.size)
AttributeError: 'Q' object has no attribute 'size'

It seems that the child process deems self.size not necessary for code execution, so it is not copied. My question is why does this happen?

Code snippet tested under macOS Catalina 10.15.6, Python 3.8.5

Upvotes: 4

Views: 3146

Answers (2)

Basil Musa
Basil Musa

Reputation: 8718

You can also use multiprocessing.manager.Queue

Upvotes: 1

SergeR
SergeR

Reputation: 66

The problem is that spawned processes do not have shared resources, so to properly recreate the queue instance for each process you need to add serialization and deserialization methods. Here is a working code:

# Portable queue
# The idea of Victor Terron used in Lemon project (https://github.com/vterron/lemon/blob/master/util/queue.py).
# Pickling/unpickling methods are added to share Queue instance between processes correctly.

import multiprocessing
import multiprocessing.queues

class SharedCounter(object):
    """ A synchronized shared counter.

    The locking done by multiprocessing.Value ensures that only a single
    process or thread may read or write the in-memory ctypes object. However,
    in order to do n += 1, Python performs a read followed by a write, so a
    second process may read the old value before the new one is written by the
    first process. The solution is to use a multiprocessing.Lock to guarantee
    the atomicity of the modifications to Value.

    This class comes almost entirely from Eli Bendersky's blog:
    http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/

    """

    def __init__(self, n = 0):
        self.count = multiprocessing.Value('i', n)

    def __getstate__(self):
        return (self.count,)

    def __setstate__(self, state):
        (self.count,) = state

    def increment(self, n = 1):
        """ Increment the counter by n (default = 1) """
        with self.count.get_lock():
            self.count.value += n

    @property
    def value(self):
        """ Return the value of the counter """
        return self.count.value

class Queue(multiprocessing.queues.Queue):
    """ A portable implementation of multiprocessing.Queue.

    Because of multithreading / multiprocessing semantics, Queue.qsize() may
    raise the NotImplementedError exception on Unix platforms like Mac OS X
    where sem_getvalue() is not implemented. This subclass addresses this
    problem by using a synchronized shared counter (initialized to zero) and
    increasing / decreasing its value every time the put() and get() methods
    are called, respectively. This not only prevents NotImplementedError from
    being raised, but also allows us to implement a reliable version of both
    qsize() and empty().

    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs, ctx=multiprocessing.get_context())
        self._counter = SharedCounter(0)

    def __getstate__(self):
        return super().__getstate__() + (self._counter,)

    def __setstate__(self, state):
        super().__setstate__(state[:-1])
        self._counter = state[-1]

    def put(self, *args, **kwargs):
        super().put(*args, **kwargs)
        self._counter.increment(1)

    def get(self, *args, **kwargs):
        item = super().get(*args, **kwargs)
        self._counter.increment(-1)
        return item

    def qsize(self):
        """ Reliable implementation of multiprocessing.Queue.qsize() """
        return self._counter.value

    def empty(self):
        """ Reliable implementation of multiprocessing.Queue.empty() """
        return not self.qsize()

Upvotes: 5

Related Questions