Reputation: 2427
Attempting to write some unittests around a function which performs a map_async()
operation. More specifically, I want to confirm that some files get cleaned up in the event of an Exception occurring in one of the processes. Sample pseudo-code with intentions provided below.
foo.py
def write_chunk(chunk):
... create file from chunk
return created_filename
class Foo:
def write_parallel(chunks):
filenames = set()
try:
pool = Pool(processes=2)
pool.map_async(write_chunk, chunks, callback=filenames.add)
except Exception:
//handle exception
finally:
cleanup_files(filenames)
test_foo.py
@patch("foo.write_chunk")
def test_write_parallel_exception_cleanup(self, mock_write_chunk):
def mock_side_effect(chunk):
if "chunk_1" == chunk:
raise Exception
else:
return chunk
mock_write_chunk.side_effect = mock_side_effect
foo = Foo()
foo.write_parallel({"chunk_1", "chunk_2"})
//assert "chunk_2" cleaned up and exception is thrown.
However, when I go to perform the test, I get the following PicklingError: PicklingError: Can't pickle <class 'mock.MagicMock'>: it's not the same object as mock.MagicMock
.
Any ideas how to perform the desired result of replacing the mapped function with my own mock function?
Upvotes: 1
Views: 568
Reputation: 101
You can mock map_async() and create a fixture for it using pytest. This way you would execute the code in a syncronous way for testing purposes, enabling patching/mocking and avoiding the pickling error.
class MockPoolMapAsyncResult:
"""
Mock for the `multiprocessing.pool.Pool.map_async` method.
This mock executes the code in a syncronous way and enables the
usage of patches and mocks within the tests
that run multiprocessing code. It also avoids errors with pickling.
"""
def __init__(self, func, args):
self._func = func
self._args = args
def get(self, timeout=0):
result = [self._func(args) for args in self._args]
return result
@fixture(autouse=True)
def mock_pool_map_async(monkeypatch):
monkeypatch.setattr("multiprocessing.pool.Pool.map_async",
lambda self, func, args=():
MockPoolMapAsyncResult(func, args))
Source: https://gist.github.com/antobarbero/de3bfd6e68672a5b305854d8c9e8cb5c
Upvotes: 0
Reputation: 2427
So since the issue stemmed from trying to Mock and Pickle the function, I decided to pull out the functionality to a separate function, mock that function, while allowing the original function to be pickled. See below:
foo.py
def write_chunk(chunk):
return write_chunk_wrapped(chunk)
def write_chunk_wrapped(chunk)
... create file from chunk
return created_filename
class Foo:
def write_parallel(chunks):
filenames = set()
try:
pool = Pool(processes=2)
pool.map_async(write_chunk, chunks, callback=filenames.add)
except Exception:
//handle exception
finally:
cleanup_files(filenames)
test_foo.py
@patch("foo.write_chunk_wrapped")
def test_write_parallel_exception_cleanup(self, mock_write_chunk_wrapped):
def mock_side_effect(chunk):
if "chunk_1" == chunk:
raise Exception
else:
return chunk
mock_write_chunk_wrapped.side_effect = mock_side_effect
foo = Foo()
foo.write_parallel({"chunk_1", "chunk_2"})
//assert "chunk_2" cleaned up and exception is thrown.
Upvotes: 1