Sergeant Salty
Sergeant Salty

Reputation: 13

Python multiprocessing hangs even if there are timeouts set

For some reason, my program is hanging using multiprocessing and queues, even though I set timeouts and check if the queue is empty. This happens on both Windows and Linux.

There are multiple processes that recieve inputs (here a, b and c) and should send results (here they just send back the inputs a, b and c).

From what I see, after all "arguments are given" they send back results for a and b over and over again, although a and b are provided only once.

import multiprocessing as mp
import queue

class Multithreading:
   def __init__(self, n_processes):
        self._processes = [
            _Thread(name='Process-{}'.format(i))
            for i in range(n_processes)]
    
    def __enter__(self):
        for process in self._processes:
            process.start()
            print(f'Started {process.name}')
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        for process in self._processes:
            process.event_stopped.set()
            process.join()
    
    def run(self):
        args = ['a', 'b', 'c']
    
        n_calls = len(args)
        for i, arg in enumerate(args):
            m = i % len(self._processes)
            print(f'Setting arguments to {self._processes[m].name}')
            is_started = False
            while not is_started:
                try:
                    self._processes[m].queue_inputs.put(arg, timeout=0.05)
                    is_started = True
                    print(f'Argument given to {self._processes[m].name}')
                except queue.Full:
                    pass
    
        print(f'All arguments given')
    
        for i in range(n_calls):
            m = i % len(self._processes)
            print(f'Checking result from {self._processes[m].name}')
            arg = None
            while True:
                try:
                    arg = self._processes[m].queue_results.get(timeout=0.05)
                    print('Received {}'.format(arg))
                    break
                except queue.Empty:
                    print(f'Empty in {self._processes[m].name}, arg = {arg}')
                    pass

class _Thread(mp.Process):
    def __init__(self, name):
        super().__init__(name=name, target=self._run)
        self.queue_inputs = mp.Queue()
        self.queue_results = mp.Queue()
        self.event_stopped = mp.Event()

    def _run(self):
        print(f'Running {self.name}')
        while not self.event_stopped.is_set():
            try:
                arg = self.queue_inputs.get(timeout=0.05)
                print(f'{self.name} received {arg}')
                while not self.event_stopped.is_set():
                    try:
                        self.queue_results.put(arg, timeout=0.05)
                        print(f'{self.name} sent {arg}')
                    except queue.Full:
                        pass
            except queue.Empty:
                pass

if __name__ == '__main__':
    for _ in range(100000000):
        with Multithreading(n_processes=2) as m:
            m.run()

I would expect timeouts of put and get methods to raise the according exceptions, but apparently they do not.

Upvotes: 0

Views: 144

Answers (1)

Serge Ballesta
Serge Ballesta

Reputation: 148880

The problem is in _Thread._run:

def _run(self):
    print(f'Running {self.name}')
    while not self.event_stopped.is_set():             # Ok, loop until event_stopped
        try:
            arg = self.queue_inputs.get(timeout=0.05)  # Ok, try to get an item
            print(f'{self.name} received {arg}')
            while not self.event_stopped.is_set():     # Oops, what is this loop for???
                try:
                    self.queue_results.put(arg, timeout=0.05)
                    print(f'{self.name} sent {arg}')
                except queue.Full:
                    pass
        except queue.Empty:
            pass

Your current code loops infinitely (or until its queue_results queue become full of event_stopped is set) on the same item repeatedly adding it to its output queue. Replacing the offending while with a if is enough to fix the problem:

    ...
    while not self.event_stopped.is_set():             # Ok, loop until event_stopped
        try:
            arg = self.queue_inputs.get(timeout=0.05)  # Ok, try to get an item
            print(f'{self.name} received {arg}')
            if not self.event_stopped.is_set():# ignore the item if stopped in the meanwhile
                try:
                    ...

Upvotes: 1

Related Questions