Reputation: 5360
I have a Producer and a Consumer thread (threading.Thread
), which share a queue
of type Queue
.
Producer run
:
while self.running:
product = produced() ### I/O operations
queue.put(product)
Consumer run
:
while self.running or not queue.empty():
product = queue.get()
time.sleep(several_seconds) ###
consume(product)
Now I need to terminate both threads from main thread, with the requirement that queue
must be empty (all consumed) before terminating.
Currently I'm using code like below to terminate these two threads:
main thread stop
:
producer.running = False
producer.join()
consumer.running = False
consumer.join()
But I guess it's unsafe if there are more consumers.
In addition, I'm not sure whether the sleep
will release schedule to the producer so that it can produce more products. In fact, I find the producer keeps "starving" but I'm not sure whether this is the root cause.
Is there a decent way to deal with this case?
Upvotes: 3
Views: 4902
Reputation: 1209
One observation from your code is that, your consumer
will keep on looking for getting some thing from the queue, ideally you should handle that by keeping some timeout
and handle Empty
exception for the same like below, ideally this helps to check the while self.running or not queue.empty()
for every timeout
.
while self.running or not queue.empty():
try:
product = queue.get(timeout=1)
except Empty:
pass
time.sleep(several_seconds) ###
consume(product)
I did simulated your situation and created producer
and consumer
threads, Below is the sample code that is running with 2 producers
and 4 consumers
it's working very well. hope this helps you!
import time
import threading
from Queue import Queue, Empty
"""A multi-producer, multi-consumer queue."""
# A thread that produces data
class Producer(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose=None):
threading.Thread.__init__(self, group=group, target=target, name=name,
verbose=verbose)
self.running = True
self.name = name
self.args = args
self.kwargs = kwargs
def run(self):
out_q = self.kwargs.get('queue')
while self.running:
# Adding some integer
out_q.put(10)
# Kepping this thread in sleep not to do many iterations
time.sleep(0.1)
print 'producer {name} terminated\n'.format(name=self.name)
# A thread that consumes data
class Consumer(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose=None):
threading.Thread.__init__(self, group=group, target=target, name=name,
verbose=verbose)
self.args = args
self.kwargs = kwargs
self.producer_alive = True
self.name = name
def run(self):
in_q = self.kwargs.get('queue')
# Consumer should die one queue is producer si dead and queue is empty.
while self.producer_alive or not in_q.empty():
try:
data = in_q.get(timeout=1)
except Empty, e:
pass
# This part you can do anything to consume time
if isinstance(data, int):
# just doing some work, infact you can make this one sleep
for i in xrange(data + 10**6):
pass
else:
pass
print 'Consumer {name} terminated (Is producer alive={pstatus}, Is Queue empty={qstatus})!\n'.format(
name=self.name, pstatus=self.producer_alive, qstatus=in_q.empty())
# Create the shared queue and launch both thread pools
q = Queue()
producer_pool, consumer_pool = [], []
for i in range(1, 3):
producer_worker = Producer(kwargs={'queue': q}, name=str(i))
producer_pool.append(producer_worker)
producer_worker.start()
for i in xrange(1, 5):
consumer_worker = Consumer(kwargs={'queue': q}, name=str(i))
consumer_pool.append(consumer_worker)
consumer_worker.start()
while 1:
control_process = raw_input('> Y/N: ')
if control_process == 'Y':
for producer in producer_pool:
producer.running = False
# Joining this to make sure all the producers die
producer.join()
for consumer in consumer_pool:
# Ideally consumer should stop once producers die
consumer.producer_alive = False
break
Upvotes: 2
Reputation: 18563
You can put a sentinel object in queue to signal end of tasks, causing all consumers to terminate:
_sentinel = object()
def producer(queue):
while running:
# produce some data
queue.put(data)
queue.put(_sentinel)
def consumer(queue):
while True:
data = queue.get()
if data is _sentinel:
# put it back so that other consumers see it
queue.put(_sentinel)
break
# Process data
This snippet is shamelessly copied from Python Cookbook 12.3.
_sentinel
to mark end of queue. None
also works if no task produced by producer is None
, but using a _sentinel
is safer for the more general case.Upvotes: 8
Reputation: 8137
Edit 2:
a) The reason your consumers keep taking so much time is because your loop runs continously even when you have no data.
b) I added code at that bottom that shows how to handle this.
If I understood you correctly, the producer/consumer is a continuous process, e.g. it is acceptable to delay the shutdown until you exit the current blocking I/O and process the data you received from that.
In that case, to shut down your producer and consumer in an orderly fashion, I would add communication from the main thread to the producer thread to invoke a shutdown. In the most general case, this could be a queue that the main thread can use to queue a "shutdown" code, but in the simple case of a single producer that is to be stopped and never restarted, it could simply be a global shutdown flag.
Your producer should check this shutdown condition (queue or flag) in its main loop right before it would start a blocking I/O operation (e.g. after you have finished sending other data to the consumer queue). If the flag is set, then it should put a special end-of-data code (that does not look like your normal data) on the queue to tell the consumer that a shut down is occurring, and then the producer should return (terminate itself).
The consumer should be modified to check for this end-of-data code whenever it pulls data out of the queue. If the end-of-data code is found, it should do an orderly shutdown and return (terminating itself).
If there are multiple consumers, then the producer could queue multiple end-of-data messages -- one for each consumer -- before it shuts down. Since the consumers stop consuming after they read the message, they will all eventually shut down.
Alternatively, if you do not know up-front how many consumers there are, then part of the orderly shut down of the consumer could be re-queueing the end-of-data code.
This will insure that all consumers eventually see the end-of-data code and shut down, and when all are done, there will be one remaining item in the queue -- the end-of-data code queued by the last consumer.
EDIT:
The correct way to represent your end-of-data code is highly application dependent, but in many cases a simple None
works very well. Since None
is a singleton, the consumer can use the very efficient if data is None
construct to deal with the end case.
Another possibility that can be even more efficient in some cases is to set up a try /except
outside your main consumer loop, in such a way that if the except happened, it was because you were trying to unpack the data in a way that always works except for when you are processing the end-of-data code.
EDIT 2:
Combining these concepts with your initial code, now the producer does this:
while self.running:
product = produced() ### I/O operations
queue.put(product)
for x in range(number_of_consumers):
queue.put(None) # Termination code
Each consumer does this:
while 1:
product = queue.get()
if product is None:
break
consume(product)
The main program can then just do this:
producer.running = False
producer.join()
for consumer in consumers:
consumer.join()
Upvotes: 2