Reputation: 17042
I was able to test the implementation without multiprocessing using the code below.
import unittest
from unittest.mock import patch
def side_effect_cube(x):
return x**3
@patch("test.data.geocode.test_geo_thread.cube", side_effect=side_effect_cube)
def test_sum(mock_cube):
assert find_cube(7) == [1, 8, 27, 64, 125, 216]
assert mock_cube.called
def find_cube(num):
result = []
for i in range(1,num):
result.append(cube(i))
return result
def cube(x):
return x**3
However when I added multi-processing to the implementation , it started failing.
import unittest
from unittest.mock import patch
import multiprocessing as mp
def side_effect_cube(x):
return x**3
@patch("test.data.geocode.test_geo_thread.cube", side_effect=side_effect_cube)
def test_sum(mock_cube):
assert find_cube(7) == [1, 8, 27, 64, 125, 216]
assert mock_cube.called
def find_cube(num):
pool = mp.Pool(processes=4)
result = [pool.apply(cube, args=(x,)) for x in range(1, num)]
return result
def cube(x):
return x**3
Below is the error that I see
cls = <class 'multiprocessing.reduction.ForkingPickler'>, obj = (0, 0, <MagicMock name='cube' id='61482936'>, (1,), {}), protocol = None
@classmethod
def dumps(cls, obj, protocol=None):
buf = io.BytesIO()
> cls(buf, protocol).dump(obj) E _pickle.PicklingError: Can't pickle <class 'unittest.mock.MagicMock'>: it's not the same object as unittest.mock.MagicMock
..\..\appdata\local\programs\python\python37\lib\multiprocessing\reduction.py:51: PicklingError
Upvotes: 1
Views: 730
Reputation: 46
This is a unresolved problem.
"Because the class for any individual Mock / MagicMock isn't available at the top level of the mock module I don't think this can be fixed (fundamental pickle limitation)." #139
Here some workarounds:
If you don't need the magic methods, like assert
and return_value
you can create a custom class, based on the issue referenced above, #139:
class PickableMock(Mock):
def __reduce__(self):
return Mock, ()
As stated here, you can use a fork of the multiprocess lib, I recommended to check the thread to see the problems you may face with that.
The solution I used for my use case was to mock the multiprocess.Pool()
, I used as a context manager, and thanks to this thread I was able to mock and assert the method that was called with the correct arguments. Your code will be like:
import multiprocessing
import unittest
from unittest.mock import patch, MagicMock
def cube(x):
return x**3
def find_cube(num):
with multiprocessing.Pool() as executor:
return [executor.apply(cube, args=(x,)) for x in range(1, num)]
class MyTestCase(unittest.TestCase):
@patch("multiprocessing.Pool")
def test_cube(self, pool):
pool.return_value.__enter__.apply = MagicMock()
num = 7
find_cube(num)
for i in range(1, num):
self.assertEqual(i,
pool.return_value.__enter__.mock_calls[i].kwargs['args'][0])
Upvotes: 3