Reputation: 3806
In the following code, I have two queues for running different kinds of threads. These threads add to each other's queues recursively (Queue 1 grabs some info, Queue 2 processes it and adds more to Queue 1).
I want to wait until all items in both queues are fully processed. Currently I'm using this code
queue.join()
out_queue.join()
The problem is when the first queue temporarily runs out of stuff to do, it closes out, so it never sees what the queue 2 (the out_queue) adds to it after that point.
I added in the time.sleep() function which is a very hacked fix, by 30s both queues have filled up enough to not run out.
What is the standard Python way of fixing this? Do I have to have just one queue, and tag items in it as to which thread they should be handled by?
queue = Queue.Queue()
out_queue = Queue.Queue()
class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, queue, out_queue):
threading.Thread.__init__(self)
self.queue = queue
self.out_queue = out_queue
def run(self):
while True:
row = self.queue.get()
request = urllib2.Request(row[0], None, req_headers)
# ... some processing ...
self.out_queue.put([row, http_status, page])
self.queue.task_done()
class DatamineThread(threading.Thread):
def __init__(self, out_queue, mysql):
threading.Thread.__init__(self)
self.out_queue = out_queue
self.mysql = mysql
def run(self):
while True:
row = self.out_queue.get()
# ... some processing ...
queue.put(newrow)
self.out_queue.task_done()
queue = Queue.Queue()
out_queue = Queue.Queue()
for i in range(URL_THREAD_COUNT):
t = ThreadUrl(queue, out_queue)
t.setDaemon(True)
t.start()
#populate queue with data
for row in rows:
queue.put(row)
#MySQL Connector
mysql = MySQLConn(host='localhost', user='root', passwd = None, db='db')
#spawn DatamineThread, if you have multiple, make sure each one has it's own mysql connector
dt = DatamineThread(out_queue, mysql)
dt.setDaemon(True)
dt.start()
time.sleep(30)
#wait on the queue until everything has been processed
queue.join()
out_queue.join()
Upvotes: 7
Views: 7031
Reputation: 3955
Assuming that the two queues are named queue_1
and queue_2
.
Correct solution: keep track of the total number of pending works separately (with a lock), then wait until the value is zero (using a condition variable).
Correct solution, but not recommended: Using an undocumented API/internal methods...
while True:
with queue_1.mutex, queue_2.mutex:
if queue_1.unfinished_tasks==0 and queue_2.unfinished_tasks==0:
break
queue_1.join()
queue_2.join()
Incorrect solution:
while not (queue_1.empty() and queue_2.empty()):
queue_1.join()
queue_2.join()
It's not correct, because after queue_2.join
and the next while
check; and it's possible that there's no items in both queues yet the task is not done (there's an element being processed)
For instance, in the code below:
#!/bin/python
from threading import Thread
from queue import Queue
import time
queue_1 = Queue()
queue_2 = Queue()
def debug(): print(queue_1.qsize(), queue_2.qsize())
def run_debug():
while True:
time.sleep(0.2)
debug()
Thread(target=run_debug).start()
def run_1():
while True:
value=queue_1.get()
print("get value", value)
time.sleep(1)
if value:
print("put value", value-1)
queue_2.put(value-1)
time.sleep(0.5)
queue_1.task_done()
def run_2():
while True:
value=queue_2.get()
print("get value", value)
time.sleep(1)
if value:
print("put value", value-1)
queue_1.put(value-1)
time.sleep(0.5)
queue_2.task_done()
thread_1 = Thread(target=run_1)
thread_2 = Thread(target=run_2)
thread_1.start()
thread_2.start()
queue_1.put(3)
# wait for both queues
while not (queue_1.empty() and queue_2.empty()):
queue_1.join()
queue_2.join()
print("done")
# (add code to stop the threads properly)
the output is
get value 3
get value 2
get value 1
done
get value 0
Upvotes: 2
Reputation: 1311
I tried to do something like this recently and came up with this. I check the size of each of the queues and keep going until they are all empty.
inqueue = True
while inqueue:
time.sleep(5)
q1 = queue.qsize()
q2 = out_queue.qsize()
print("queue:%d,out_queue:%d"% (q1,q2))
inqueue = q1 or q2
queue.join()
out_queue.join()
Upvotes: 0
Reputation: 15170
Change the workers so that they need a sentinel value to exit, instead of exiting when they don't have any more work in the queue. In the following code the howdy
worker reads items from the input queue. If the value is the sentinel (None
, but it could be anything), the worker exits.
As a consequence, you don't need to mess with timeouts, which as you've found can be rather dodgy. Another consequence is that if you have N threads, you have to append N sentinels to the input queue to kill off your workers. Otherwise you'll wind up with a worker who will wait forever. A zombie worker, if you will.
import threading, Queue
def howdy(q):
for msg in iter(q.get, None):
print 'howdy,',msg
inq = Queue.Queue()
for word in 'whiskey syrup bitters'.split():
inq.put(word)
inq.put( None ) # tell worker to exit
thread = threading.Thread(target=howdy, args=[inq])
thread.start()
thread.join()
howdy, whiskey
howdy, syrup
howdy, bitters
Upvotes: 1