Reputation: 850
Could somebody tell me why on Earth we can't still not use local function with the multiprocessing.Process
standard libraries ?
We are in 2022 and this problem required copy-pasting code to solve it...
def run_ping_outside():
ping_on_port('google.com', 1234, timeout=5)
class PingHelperUnitTest(unittest.TestCase):
def test_ping_on_port_timeout(self):
def run_ping_inside():
self.assertTrue(ping_on_port('google.com', 1234, timeout=5))
# Don't work
# process = Process(target=run_ping_inside)
process = Process(target=run_ping_outside)
process.start()
That is a simple example but the real benefit would be to create a @timeout
decorator for unit testing
And be able to test the speed of some part of our applications.
def timeout(max_timeout_allowed):
def _timeout(decorated_function):
def timeout_wrapper(self=None):
def timeout_inner(*args, **kwargs):
arguments = (args, kwargs) if self is None else (self, args, kwargs)
process = Process(target=decorated_function, args=arguments)
process.start()
sleep(max_timeout_allowed)
done_in_time = not process.is_alive()
if not done_in_time:
process.terminate()
if self is not None:
self.assertTrue(done_in_time, f"Function ran out out time: {max_timeout_allowed} second(s)")
return timeout_inner()
return timeout_wrapper
return _timeout
Here a non working example of this decorator because we can't pickle a decorated function...
PS: No I can't use Thread
as we are not able to terminate them...
Upvotes: 0
Views: 215
Reputation: 25249
So, this is a little bit of a hack, but you could do this by saving a copy of the undecorated function under a new name and changing __qualname__
so pickle knows where to find the saved version. Then, in the subprocess, the same decorator will run, registering the undecorated function to the same place.
Here's the code:
from multiprocessing import Process
import time
import pickle
import sys
class TimeoutClosure(object):
def __init__(self, func, max_timeout_allowed):
self.func = func
self.max_timeout_allowed = max_timeout_allowed
self.register_inner_function(func)
def register_inner_function(self, func):
prefix = 'old_'
func_name = func.__qualname__
saved_name = prefix + func_name
module_name = pickle.whichmodule(func, func_name)
module = sys.modules[module_name]
setattr(module, saved_name, func)
self.func.__qualname__ = saved_name
def __call__(self, *args, **kwargs):
process = Process(target=self.func, args=args, kwargs=kwargs)
process.start()
# Note: this is faster than time.sleep(), as it exits early if
# the function finishes early.
process.join(self.max_timeout_allowed)
done_in_time = not process.is_alive()
if not done_in_time:
process.terminate()
if self is not None:
assert done_in_time, f"Function ran out out time: {self.max_timeout_allowed} second(s)"
def timeout(max_timeout_allowed):
def inner(func):
return TimeoutClosure(func, max_timeout_allowed)
return inner
@timeout(1)
def foo():
print("Entering foo")
time.sleep(2)
print("Leaving foo")
if __name__ == '__main__':
foo()
Upvotes: 1