Reputation: 4471
Consider the following unittest.TestCase
, which implements two versions of the same test, the only difference being that one executes the subTest
s in parallel using multiprocessing
.
import multiprocessing as mp
from unittest import TestCase
class TestBehaviour(TestCase):
def _test_equals(self, val):
for target_val in [1, 2]:
with self.subTest(target=target_val, source=val):
self.assertEqual(val, target_val)
def test_equality_parallel(self):
with mp.Pool(processes=4) as pool:
pool.map(self._test_equals, [1, 2])
pool.join()
pool.close()
def test_equality(self):
for val in [1, 2]:
self._test_equals(val)
The serial version, test_equality
, works as expected and produces the following test failures:
======================================================================
FAIL: test_equality (temp.TestBehaviour) (target=2, source=1)
----------------------------------------------------------------------
Traceback (most recent call last):
File "temp.py", line 11, in _test_equals
self.assertEqual(val, target_val)
AssertionError: 1 != 2
======================================================================
FAIL: test_equality (temp.TestBehaviour) (target=1, source=2)
----------------------------------------------------------------------
Traceback (most recent call last):
File "temp.py", line 11, in _test_equals
self.assertEqual(val, target_val)
AssertionError: 2 != 1
On the other hand, test_equality_parallel
causes an error as the TestCase
cannot be pickled:
Traceback (most recent call last):
File "temp.py", line 15, in test_equality_parallel
pool.map(self._test_equals, [1, 2])
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 768, in get
raise self._value
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
put(task)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_io.TextIOWrapper' object
Now I know that I could split out _test_equals
as a standalone function outside the class; however, I would like to keep the behaviour of subTest
to enable better logging (and subsequent debugging) of test failures.
How can I run the tests in parallel, but keep this subTest
functionality?
Update
I have also attempted this using pathos.multiprocessing.ProcessingPool
to circumvent the issues with TestCase
serialization; however, in this case pool.join()
raises ValueError: Pool is still running
.
from pathos.multiprocessing import ProcessingPool
...
def test_equality_parallel(self):
pool = ProcessingPool(processes=4)
pool.map(self._test_equals, [1, 2])
pool.join()
Update 2
This question is definitely relevant. The first solution proposed, creating a second class for the methods to be called from the child process, isn't appropriate as it wouldn't enable the use of subTest
. The second, removing the unpickleable _Outcome
object from TestCase
seems hacky and, given that the child processes are running subTests
, also looks unsuitable.
Upvotes: 2
Views: 1139
Reputation: 35207
I'm the pathos
(and dill
and multiprocess
) author. You are still seeing a serialization error out of pathos
across processes. You could, try serializing across threads. Thread parallel is probably appropriate for this level of function.
import multiprocess.dummy as mp
from unittest import TestCase
class TestBehaviour(TestCase):
def _test_equals(self, val):
for target_val in [1, 2]:
with self.subTest(target=target_val, source=val):
self.assertEqual(val, target_val)
def test_equality_parallel(self):
with mp.Pool(processes=4) as pool:
pool.map(self._test_equals, [1, 2])
pool.join()
pool.close()
def test_equality(self):
for val in [1, 2]:
self._test_equals(val)
Which yields:
======================================================================
FAIL: test_equality (test_equaltiy.TestBehaviour)
----------------------------------------------------------------------
...[snip]...
AssertionError: 1 != 2
======================================================================
FAIL: test_equality_parallel (test_equaltiy.TestBehaviour)
----------------------------------------------------------------------
...[snip]...
AssertionError: 1 != 2
----------------------------------------------------------------------
Ran 2 tests in 0.108s
FAILED (failures=2)
This tells me that you might be able to use a serialization variant from dill
(i.e. within dill.settings
) to get around the serialization issue. See: https://github.com/uqfoundation/multiprocess/issues/48.
Upvotes: 3