Matej Leško
Matej Leško

Reputation: 21

ThreadPool from python's multiprocessing hangs out

I have a phantom problem with one of my unit tests. I use a ThreadPool from multiprocessing package for wrapping stdout and stderr funtions from my class utilizing paramiko. During creation I made some real life tests using code below and it is working nicely. But during writing unit test for that code I managed to get into problem, that this usage of ThreadPool hangs out.

This part hangs out for like 95 percent of time and somehow sometimes executes properly.

 while not (self.__stdout_async_r.ready() and self.__stderr_async_r.ready()):
   time.sleep(WAIT_FOR_DATA)

I've checked the values during debugging and I've found out that sometimes there is one or other condition set to finished but the other is not. But both functions are already finished so the results is just asking for the state that is never changed in the future.

The code for reproduce (with functionality necessary for this issue):

import time
from multiprocessing.pool import ThreadPool

class ExecResult(object):
  def __init__(self, command=None, exit_status_func=None, 
               receive_stdout_func=None, receive_stderr_func=None,
               connection=None):
    self.connection = connection
    self.stdout = None
    self.stderr = None
    self.ecode = None
    self.ts_stop = None
    self._exit_status_f = exit_status_func
    self.result_available = False
    self.__fetch_streams(receive_stdout_func, receive_stderr_func)

  def wait_for_data(self):
    WAIT_FOR_DATA = 0.1

    if not self.result_available:

      # Here it hangs out for 95 percent
      while not (self.__stdout_async_r.ready() and self.__stderr_async_r.ready()):
        time.sleep(WAIT_FOR_DATA)

      self.result_available = True
      self.ts_stop = time.time()
      self.stdout = self.__stdout_async_r.get(timeout=2)
      self.stderr = self.__stderr_async_r.get(timeout=2)
      self.ecode = self._exit_status_f()


  def __fetch_streams(self, stdout_func, stderr_func):
    stdout_t = ThreadPool(processes=1)
    stderr_t = ThreadPool(processes=1)

    self.__stdout_async_r = stdout_t.apply_async(func=stdout_func)
    self.__stderr_async_r = stderr_t.apply_async(func=stderr_func)
    stdout_t.close()
    stderr_t.close()

def stderr():
  return "stderr"

def stdout():
  return "stdout"

def exit():
  return "0"

# actual reproduction
res = ExecResult(None, exit, stdout, stderr, None)
res.wait_for_data() #if are data available get them or wait
print res.stdout
print res.stderr
print res.ecode

Upvotes: 0

Views: 997

Answers (1)

Matej Leško
Matej Leško

Reputation: 21

As it usually is, I found out an answer for this after some time spent cursing and doing some tea.

Solution is to add this after close methods:

stdout_t.join()
stderr_t.join()

So this is the repaired part as whole:

def __fetch_streams(self, stdout_func, stderr_func):
    stdout_t = ThreadPool(processes=1)
    stderr_t = ThreadPool(processes=1)

    self.__stdout_async_r = stdout_t.apply_async(func=stdout_func)
    self.__stderr_async_r = stderr_t.apply_async(func=stderr_func)
    stdout_t.close()
    stderr_t.close()
    stdout_t.join()
    stderr_t.join()

Upvotes: 2

Related Questions