Reputation: 13
For some reason, my program is hanging using multiprocessing and queues, even though I set timeouts and check if the queue is empty. This happens on both Windows and Linux.
There are multiple processes that recieve inputs (here a, b and c) and should send results (here they just send back the inputs a, b and c).
From what I see, after all "arguments are given" they send back results for a and b over and over again, although a and b are provided only once.
import multiprocessing as mp
import queue
class Multithreading:
def __init__(self, n_processes):
self._processes = [
_Thread(name='Process-{}'.format(i))
for i in range(n_processes)]
def __enter__(self):
for process in self._processes:
process.start()
print(f'Started {process.name}')
return self
def __exit__(self, exc_type, exc_val, exc_tb):
for process in self._processes:
process.event_stopped.set()
process.join()
def run(self):
args = ['a', 'b', 'c']
n_calls = len(args)
for i, arg in enumerate(args):
m = i % len(self._processes)
print(f'Setting arguments to {self._processes[m].name}')
is_started = False
while not is_started:
try:
self._processes[m].queue_inputs.put(arg, timeout=0.05)
is_started = True
print(f'Argument given to {self._processes[m].name}')
except queue.Full:
pass
print(f'All arguments given')
for i in range(n_calls):
m = i % len(self._processes)
print(f'Checking result from {self._processes[m].name}')
arg = None
while True:
try:
arg = self._processes[m].queue_results.get(timeout=0.05)
print('Received {}'.format(arg))
break
except queue.Empty:
print(f'Empty in {self._processes[m].name}, arg = {arg}')
pass
class _Thread(mp.Process):
def __init__(self, name):
super().__init__(name=name, target=self._run)
self.queue_inputs = mp.Queue()
self.queue_results = mp.Queue()
self.event_stopped = mp.Event()
def _run(self):
print(f'Running {self.name}')
while not self.event_stopped.is_set():
try:
arg = self.queue_inputs.get(timeout=0.05)
print(f'{self.name} received {arg}')
while not self.event_stopped.is_set():
try:
self.queue_results.put(arg, timeout=0.05)
print(f'{self.name} sent {arg}')
except queue.Full:
pass
except queue.Empty:
pass
if __name__ == '__main__':
for _ in range(100000000):
with Multithreading(n_processes=2) as m:
m.run()
I would expect timeouts of put
and get
methods to raise the according exceptions, but apparently they do not.
Upvotes: 0
Views: 144
Reputation: 148880
The problem is in _Thread._run
:
def _run(self):
print(f'Running {self.name}')
while not self.event_stopped.is_set(): # Ok, loop until event_stopped
try:
arg = self.queue_inputs.get(timeout=0.05) # Ok, try to get an item
print(f'{self.name} received {arg}')
while not self.event_stopped.is_set(): # Oops, what is this loop for???
try:
self.queue_results.put(arg, timeout=0.05)
print(f'{self.name} sent {arg}')
except queue.Full:
pass
except queue.Empty:
pass
Your current code loops infinitely (or until its queue_results
queue become full of event_stopped
is set) on the same item repeatedly adding it to its output queue. Replacing the offending while
with a if
is enough to fix the problem:
...
while not self.event_stopped.is_set(): # Ok, loop until event_stopped
try:
arg = self.queue_inputs.get(timeout=0.05) # Ok, try to get an item
print(f'{self.name} received {arg}')
if not self.event_stopped.is_set():# ignore the item if stopped in the meanwhile
try:
...
Upvotes: 1