Karim Stekelenburg
Karim Stekelenburg

Reputation: 643

How to gracefully terminate a multithreaded Python application that uses queue.Queue

I have been trying to get my application to terminate gracefully for quite some time now, but so far none of the answers I have found worked.

The sample code below illustrates the structure of my application. It basically is a chain of threads that passes data to one another using Queues.

from abc import abstractmethod
from time import sleep
from threading import Thread, Event
from queue import Queue
import signal
import sys


class StoppableThread(Thread):

    def __init__(self):
        super().__init__()
        self.stopper = Event()
        self.queue = Queue()

    @abstractmethod
    def actual_job(self):
        pass

    def stop_running(self):
        self.stopper.set()


    def run(self):
        while not self.stopper.is_set():
            print(self.stopper.is_set())
            self.actual_job()
        self.queue.join()

class SomeObjectOne(StoppableThread):
    def __init__(self, name, some_object_two):
        super().__init__()
        self.name = name
        self.obj_two = some_object_two

    def actual_job(self):
        # print('{} is currently running'.format(self.name))
        input_string = 'some string'
        print('{} outputs {}'.format(self.name, input_string))
        self.obj_two.queue.put(input_string)
        sleep(2)

class SomeObjectTwo(StoppableThread):
    def __init__(self, name, some_object_three):
        super().__init__()
        self.name = name
        self.some_object_three = some_object_three


    def actual_job(self):
        # print('{} is currently running'.format(self.name))
        some_string = self.queue.get()
        inverted = some_string[::-1]
        print('{} outputs {}'.format(self.name , inverted))
        self.some_object_three.queue.put(inverted)
        sleep(2)


class SomeObjectThree(StoppableThread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def actual_job(self):
        print('{} is currently running'.format(self.name))
        some_string = self.queue.get()
        print('{} outputs {}'.format(self.name ,some_string[::-1]))
        sleep(2)




class ServiceExit(Exception):
    """
    Custom exception which is used to trigger the clean exit
    of all running threads and the main program.
    """
    pass

def service_shutdown(signum, frame):
    print('Caught signal %d' % signum)
    raise ServiceExit

signal.signal(signal.SIGTERM, service_shutdown)
signal.signal(signal.SIGINT, service_shutdown)

if __name__ == '__main__':
    thread_three = SomeObjectThree('SomeObjectThree')
    thread_two = SomeObjectTwo('SomeObjectTwo', thread_three)
    thread_one = SomeObjectOne('SomeObjectOne', thread_two)

    try:
        thread_three.start()
        thread_two.start()
        thread_one.start()

        # Keep the main thread running, otherwise signals are ignored.
        while True:
            sleep(0.5)

    except ServiceExit:
        print('Running service exit')
        thread_three.stop_running()
        thread_two.stop_running()
        thread_one.stop_running()
        thread_one.join()
        thread_two.join()
        thread_three.join()
        sys.exit(0)

Now, if I run this code and ctrl-C to terminate, thread_one seems to join as expected, but the code gets stuck at thread_two.join().

Because thread_one is the only thread with a continuous empty queue, I expect it has something to do with the queue.

Any ideas?

Upvotes: 0

Views: 375

Answers (1)

mnistic
mnistic

Reputation: 11020

In the run() method of StoppableThread you have this:

self.queue.join()

join() is a blocking method:

Blocks until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

So in order for join() to return, it's not enough to get() an item in the other thread, you must also indicate that it's been processed with task_done():

from abc import abstractmethod
from time import sleep
from threading import Thread, Event
from queue import Queue
import signal
import sys

class StoppableThread(Thread):

    def __init__(self):
        super().__init__()
        self.stopper = Event()
        self.queue = Queue()

    @abstractmethod
    def actual_job(self):
        pass

    def stop_running(self):
        self.stopper.set()

    def run(self):
        while not self.stopper.is_set():
            print(self.stopper.is_set())
            self.actual_job()
        self.queue.join()

class SomeObjectOne(StoppableThread):
    def __init__(self, name, some_object_two):
        super().__init__()
        self.name = name
        self.obj_two = some_object_two

    def actual_job(self):
        # print('{} is currently running'.format(self.name))
        input_string = 'some string'
        print('{} outputs {}'.format(self.name, input_string))
        self.obj_two.queue.put(input_string)
        sleep(2)

class SomeObjectTwo(StoppableThread):
    def __init__(self, name, some_object_three):
        super().__init__()
        self.name = name
        self.some_object_three = some_object_three

    def actual_job(self):
        # print('{} is currently running'.format(self.name))
        some_string = self.queue.get()
        inverted = some_string[::-1]
        print('{} outputs {}'.format(self.name , inverted))
        self.queue.task_done()
        self.some_object_three.queue.put(inverted)
        sleep(2)

class SomeObjectThree(StoppableThread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def actual_job(self):
        print('{} is currently running'.format(self.name))
        some_string = self.queue.get()
        print('{} outputs {}'.format(self.name ,some_string[::-1]))
        self.queue.task_done()
        sleep(2)

class ServiceExit(Exception):
    """
    Custom exception which is used to trigger the clean exit
    of all running threads and the main program.
    """
    pass

def service_shutdown(signum, frame):
    print('Caught signal %d' % signum)
    raise ServiceExit

signal.signal(signal.SIGTERM, service_shutdown)
signal.signal(signal.SIGINT, service_shutdown)

if __name__ == '__main__':
    thread_three = SomeObjectThree('SomeObjectThree')
    thread_two = SomeObjectTwo('SomeObjectTwo', thread_three)
    thread_one = SomeObjectOne('SomeObjectOne', thread_two)

    try:
        thread_three.start()
        thread_two.start()
        thread_one.start()

        # Keep the main thread running, otherwise signals are ignored.
        while True:
            sleep(0.5)

    except ServiceExit:
        print('Running service exit')
        thread_three.stop_running()
        thread_two.stop_running()
        thread_one.stop_running()
        thread_one.join()
        thread_two.join()
        thread_three.join()

Upvotes: 1

Related Questions