Reputation: 21
I have a phantom problem with one of my unit tests.
I use a ThreadPool
from multiprocessing
package for wrapping stdout
and stderr
funtions from my class utilizing paramiko
. During creation I made some real life tests using code below and it is working nicely. But during writing unit test for that code I managed to get into problem, that this usage of ThreadPool
hangs out.
This part hangs out for like 95 percent of time and somehow sometimes executes properly.
while not (self.__stdout_async_r.ready() and self.__stderr_async_r.ready()): time.sleep(WAIT_FOR_DATA)
I've checked the values during debugging and I've found out that sometimes there is one or other condition set to finished but the other is not. But both functions are already finished so the results is just asking for the state that is never changed in the future.
The code for reproduce (with functionality necessary for this issue):
import time
from multiprocessing.pool import ThreadPool
class ExecResult(object):
def __init__(self, command=None, exit_status_func=None,
receive_stdout_func=None, receive_stderr_func=None,
connection=None):
self.connection = connection
self.stdout = None
self.stderr = None
self.ecode = None
self.ts_stop = None
self._exit_status_f = exit_status_func
self.result_available = False
self.__fetch_streams(receive_stdout_func, receive_stderr_func)
def wait_for_data(self):
WAIT_FOR_DATA = 0.1
if not self.result_available:
# Here it hangs out for 95 percent
while not (self.__stdout_async_r.ready() and self.__stderr_async_r.ready()):
time.sleep(WAIT_FOR_DATA)
self.result_available = True
self.ts_stop = time.time()
self.stdout = self.__stdout_async_r.get(timeout=2)
self.stderr = self.__stderr_async_r.get(timeout=2)
self.ecode = self._exit_status_f()
def __fetch_streams(self, stdout_func, stderr_func):
stdout_t = ThreadPool(processes=1)
stderr_t = ThreadPool(processes=1)
self.__stdout_async_r = stdout_t.apply_async(func=stdout_func)
self.__stderr_async_r = stderr_t.apply_async(func=stderr_func)
stdout_t.close()
stderr_t.close()
def stderr():
return "stderr"
def stdout():
return "stdout"
def exit():
return "0"
# actual reproduction
res = ExecResult(None, exit, stdout, stderr, None)
res.wait_for_data() #if are data available get them or wait
print res.stdout
print res.stderr
print res.ecode
Upvotes: 0
Views: 997
Reputation: 21
As it usually is, I found out an answer for this after some time spent cursing and doing some tea.
Solution is to add this after close methods:
stdout_t.join()
stderr_t.join()
So this is the repaired part as whole:
def __fetch_streams(self, stdout_func, stderr_func):
stdout_t = ThreadPool(processes=1)
stderr_t = ThreadPool(processes=1)
self.__stdout_async_r = stdout_t.apply_async(func=stdout_func)
self.__stderr_async_r = stderr_t.apply_async(func=stderr_func)
stdout_t.close()
stderr_t.close()
stdout_t.join()
stderr_t.join()
Upvotes: 2