Hongxu Chen
Hongxu Chen

Reputation: 5360

How to terminate Producer-Consumer threads from main thread in Python?

I have a Producer and a Consumer thread (threading.Thread), which share a queue of type Queue.

Producer run:

while self.running:
    product = produced() ### I/O operations
    queue.put(product)

Consumer run:

while self.running or not queue.empty():
    product = queue.get()
    time.sleep(several_seconds) ###
    consume(product)

Now I need to terminate both threads from main thread, with the requirement that queue must be empty (all consumed) before terminating.

Currently I'm using code like below to terminate these two threads:

main thread stop:

producer.running = False
producer.join()
consumer.running = False
consumer.join()

But I guess it's unsafe if there are more consumers.

In addition, I'm not sure whether the sleep will release schedule to the producer so that it can produce more products. In fact, I find the producer keeps "starving" but I'm not sure whether this is the root cause.

Is there a decent way to deal with this case?

Upvotes: 3

Views: 4902

Answers (3)

gsb-eng
gsb-eng

Reputation: 1209

One observation from your code is that, your consumer will keep on looking for getting some thing from the queue, ideally you should handle that by keeping some timeout and handle Empty exception for the same like below, ideally this helps to check the while self.running or not queue.empty() for every timeout.

while self.running or not queue.empty():
    try:
        product = queue.get(timeout=1)
    except Empty:
        pass
    time.sleep(several_seconds) ###
    consume(product)

I did simulated your situation and created producer and consumer threads, Below is the sample code that is running with 2 producers and 4 consumers it's working very well. hope this helps you!

import time
import threading

from Queue import Queue, Empty

"""A multi-producer, multi-consumer queue."""

# A thread that produces data
class Producer(threading.Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None):
        threading.Thread.__init__(self, group=group, target=target, name=name,
                                  verbose=verbose)
        self.running = True
        self.name = name
        self.args = args
        self.kwargs = kwargs

    def run(self):
        out_q = self.kwargs.get('queue')
        while self.running:
            # Adding some integer
            out_q.put(10)
            # Kepping this thread in sleep not to do many iterations
            time.sleep(0.1)

        print 'producer {name} terminated\n'.format(name=self.name)


# A thread that consumes data
class Consumer(threading.Thread):

    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None):
        threading.Thread.__init__(self, group=group, target=target, name=name,
                                  verbose=verbose)
        self.args = args
        self.kwargs = kwargs
        self.producer_alive = True
        self.name = name

    def run(self):
        in_q = self.kwargs.get('queue')

        # Consumer should die one queue is producer si dead and queue is empty.
        while self.producer_alive or not in_q.empty():
            try:
                data = in_q.get(timeout=1)
            except Empty, e:
                pass

            # This part you can do anything to consume time
            if isinstance(data, int):
                # just doing some work, infact you can make this one sleep
                for i in xrange(data + 10**6):
                    pass
            else:
                pass
        print 'Consumer {name} terminated (Is producer alive={pstatus}, Is Queue empty={qstatus})!\n'.format(
            name=self.name, pstatus=self.producer_alive, qstatus=in_q.empty())


# Create the shared queue and launch both thread pools
q = Queue()

producer_pool, consumer_pool = [], []


for i in range(1, 3):
    producer_worker = Producer(kwargs={'queue': q}, name=str(i))
    producer_pool.append(producer_worker)
    producer_worker.start()

for i in xrange(1, 5):
    consumer_worker = Consumer(kwargs={'queue': q}, name=str(i))
    consumer_pool.append(consumer_worker)
    consumer_worker.start()

while 1:
    control_process = raw_input('> Y/N: ')
    if control_process == 'Y':
        for producer in producer_pool:
            producer.running = False
            # Joining this to make sure all the producers die
            producer.join()

        for consumer in consumer_pool:
            # Ideally consumer should stop once producers die
            consumer.producer_alive = False

        break

Upvotes: 2

NeoWang
NeoWang

Reputation: 18563

You can put a sentinel object in queue to signal end of tasks, causing all consumers to terminate:

_sentinel = object()

def producer(queue):
    while running:
       # produce some data
       queue.put(data)
    queue.put(_sentinel)

def consumer(queue):
    while True:
        data = queue.get()
        if data is _sentinel:
            # put it back so that other consumers see it
            queue.put(_sentinel)
            break
        # Process data

This snippet is shamelessly copied from Python Cookbook 12.3.

  1. Use a _sentinel to mark end of queue. None also works if no task produced by producer is None, but using a _sentinel is safer for the more general case.
  2. You don't need to put multiple end markers into queue, for each consumer. You may not be aware of how many threads are consuming. Just put the sentinel back into queue when a consumer finds it, for other consumers to get the signal.

Upvotes: 8

Patrick Maupin
Patrick Maupin

Reputation: 8137

Edit 2:

a) The reason your consumers keep taking so much time is because your loop runs continously even when you have no data.

b) I added code at that bottom that shows how to handle this.

If I understood you correctly, the producer/consumer is a continuous process, e.g. it is acceptable to delay the shutdown until you exit the current blocking I/O and process the data you received from that.

In that case, to shut down your producer and consumer in an orderly fashion, I would add communication from the main thread to the producer thread to invoke a shutdown. In the most general case, this could be a queue that the main thread can use to queue a "shutdown" code, but in the simple case of a single producer that is to be stopped and never restarted, it could simply be a global shutdown flag.

Your producer should check this shutdown condition (queue or flag) in its main loop right before it would start a blocking I/O operation (e.g. after you have finished sending other data to the consumer queue). If the flag is set, then it should put a special end-of-data code (that does not look like your normal data) on the queue to tell the consumer that a shut down is occurring, and then the producer should return (terminate itself).

The consumer should be modified to check for this end-of-data code whenever it pulls data out of the queue. If the end-of-data code is found, it should do an orderly shutdown and return (terminating itself).

If there are multiple consumers, then the producer could queue multiple end-of-data messages -- one for each consumer -- before it shuts down. Since the consumers stop consuming after they read the message, they will all eventually shut down.

Alternatively, if you do not know up-front how many consumers there are, then part of the orderly shut down of the consumer could be re-queueing the end-of-data code.

This will insure that all consumers eventually see the end-of-data code and shut down, and when all are done, there will be one remaining item in the queue -- the end-of-data code queued by the last consumer.

EDIT:

The correct way to represent your end-of-data code is highly application dependent, but in many cases a simple None works very well. Since None is a singleton, the consumer can use the very efficient if data is None construct to deal with the end case.

Another possibility that can be even more efficient in some cases is to set up a try /except outside your main consumer loop, in such a way that if the except happened, it was because you were trying to unpack the data in a way that always works except for when you are processing the end-of-data code.

EDIT 2:

Combining these concepts with your initial code, now the producer does this:

while self.running:
    product = produced() ### I/O operations
    queue.put(product)
for x in range(number_of_consumers):
    queue.put(None)  # Termination code

Each consumer does this:

while 1:
    product = queue.get()
    if product is None:
        break
    consume(product)

The main program can then just do this:

producer.running = False
producer.join()
for consumer in consumers:
    consumer.join()

Upvotes: 2

Related Questions