Brent Hronik
Brent Hronik

Reputation: 2427

Mock map_async function argument yields PicklingError

Attempting to write some unittests around a function which performs a map_async() operation. More specifically, I want to confirm that some files get cleaned up in the event of an Exception occurring in one of the processes. Sample pseudo-code with intentions provided below.

foo.py

def write_chunk(chunk):
    ... create file from chunk
    return created_filename

class Foo:
    def write_parallel(chunks):
        filenames = set()
        try:
            pool = Pool(processes=2)
            pool.map_async(write_chunk, chunks, callback=filenames.add)
        except Exception:
            //handle exception
        finally:
            cleanup_files(filenames)

test_foo.py

@patch("foo.write_chunk")
def test_write_parallel_exception_cleanup(self, mock_write_chunk):
    def mock_side_effect(chunk):
        if "chunk_1" == chunk:
            raise Exception
        else:
            return chunk
    mock_write_chunk.side_effect = mock_side_effect

    foo = Foo()
    foo.write_parallel({"chunk_1", "chunk_2"})
    //assert "chunk_2" cleaned up and exception is thrown.

However, when I go to perform the test, I get the following PicklingError: PicklingError: Can't pickle <class 'mock.MagicMock'>: it's not the same object as mock.MagicMock.

Any ideas how to perform the desired result of replacing the mapped function with my own mock function?

Upvotes: 1

Views: 568

Answers (2)

Antonella Barbero
Antonella Barbero

Reputation: 101

You can mock map_async() and create a fixture for it using pytest. This way you would execute the code in a syncronous way for testing purposes, enabling patching/mocking and avoiding the pickling error.

class MockPoolMapAsyncResult:
    """
    Mock for the `multiprocessing.pool.Pool.map_async` method.
    This mock executes the code in a syncronous way and enables the 
    usage of patches and mocks within the tests
    that run multiprocessing code. It also avoids errors with pickling.
    """
    def __init__(self, func, args):
        self._func = func
        self._args = args

    def get(self, timeout=0):
        result = [self._func(args) for args in self._args]
        return result


@fixture(autouse=True)
def mock_pool_map_async(monkeypatch):
    monkeypatch.setattr("multiprocessing.pool.Pool.map_async",
                        lambda self, func, args=():
                        MockPoolMapAsyncResult(func, args))

Source: https://gist.github.com/antobarbero/de3bfd6e68672a5b305854d8c9e8cb5c

Upvotes: 0

Brent Hronik
Brent Hronik

Reputation: 2427

So since the issue stemmed from trying to Mock and Pickle the function, I decided to pull out the functionality to a separate function, mock that function, while allowing the original function to be pickled. See below:

foo.py

def write_chunk(chunk):
    return write_chunk_wrapped(chunk)

def write_chunk_wrapped(chunk)
    ... create file from chunk
    return created_filename

class Foo:
    def write_parallel(chunks):
        filenames = set()
        try:
            pool = Pool(processes=2)
            pool.map_async(write_chunk, chunks, callback=filenames.add)
        except Exception:
            //handle exception
        finally:
            cleanup_files(filenames)

test_foo.py

@patch("foo.write_chunk_wrapped")
def test_write_parallel_exception_cleanup(self, mock_write_chunk_wrapped):
    def mock_side_effect(chunk):
        if "chunk_1" == chunk:
            raise Exception
        else:
            return chunk
    mock_write_chunk_wrapped.side_effect = mock_side_effect

    foo = Foo()
    foo.write_parallel({"chunk_1", "chunk_2"})
    //assert "chunk_2" cleaned up and exception is thrown.

Upvotes: 1

Related Questions