Reputation: 2783
In the below example, if you execute the program multiple times, it spawns a new thread each time with a new ID. 1. How do I terminate all the threads on task completion ? 2. How can I assign name/ID to the threads ?
import threading, Queue
THREAD_LIMIT = 3
jobs = Queue.Queue(5) # This sets up the queue object to use 5 slots
singlelock = threading.Lock() # This is a lock so threads don't print trough each other
# list
inputlist_Values = [ (5,5),(10,4),(78,5),(87,2),(65,4),(10,10),(65,2),(88,95),(44,55),(33,3) ]
def DoWork(inputlist):
print "Inputlist received..."
print inputlist
# Spawn the threads
print "Spawning the {0} threads.".format(THREAD_LIMIT)
for x in xrange(THREAD_LIMIT):
print "Thread {0} started.".format(x)
# This is the thread class that we instantiate.
worker().start()
# Put stuff in queue
print "Putting stuff in queue"
for i in inputlist:
# Block if queue is full, and wait 5 seconds. After 5s raise Queue Full error.
try:
jobs.put(i, block=True, timeout=5)
except:
singlelock.acquire()
print "The queue is full !"
singlelock.release()
# Wait for the threads to finish
singlelock.acquire() # Acquire the lock so we can print
print "Waiting for threads to finish."
singlelock.release() # Release the lock
jobs.join() # This command waits for all threads to finish.
class worker(threading.Thread):
def run(self):
# run forever
while 1:
# Try and get a job out of the queue
try:
job = jobs.get(True,1)
singlelock.acquire() # Acquire the lock
print self
print "Multiplication of {0} with {1} gives {2}".format(job[0],job[1],(job[0]*job[1]))
singlelock.release() # Release the lock
# Let the queue know the job is finished.
jobs.task_done()
except:
break # No more jobs in the queue
def main():
DoWork(inputlist_Values)
Upvotes: 1
Views: 1282
Reputation: 414205
How do I terminate all the threads on task completion?
You could put THREAD_LIMIT
sentinel values (e.g., None
) at the end of the queue and exit thread's run()
method if a thread sees it.
On your main thread exit all non-daemon threads are joined so the program will keep running if any of the threads is alive. Daemon threads are terminated on your program exit.
How can I assign name/ID to the threads ?
You can assign name by passing it to the constructor or by changing .name
directly.
Thread identifier .ident
is a read-only property that is unique among alive threads. It maybe reused if one thread exits and another starts.
You could rewrite you code using multiprocessing.dummy.Pool
that provides the same interface as multiprocessing.Pool
but uses threads instead of processes:
#!/usr/bin/env python
import logging
from multiprocessing.dummy import Pool
debug = logging.getLogger(__name__).debug
def work(x_y):
try:
x, y = x_y # do some work here
debug('got %r', x_y)
return x / y, None
except Exception as e:
logging.getLogger(__name__).exception('work%r failed', x_y)
return None, e
def main():
logging.basicConfig(level=logging.DEBUG,
format="%(levelname)s:%(threadName)s:%(asctime)s %(message)s")
inputlist = [ (5,5),(10,4),(78,5),(87,2),(65,4),(10,10), (1,0), (0,1) ]
pool = Pool(3)
s = 0.
for result, error in pool.imap_unordered(work, inputlist):
if error is None:
s += result
print("sum=%s" % (s,))
pool.close()
pool.join()
if __name__ == "__main__":
main()
DEBUG:Thread-1:2013-01-14 15:37:37,253 got (5, 5)
DEBUG:Thread-1:2013-01-14 15:37:37,253 got (87, 2)
DEBUG:Thread-1:2013-01-14 15:37:37,253 got (65, 4)
DEBUG:Thread-1:2013-01-14 15:37:37,254 got (10, 10)
DEBUG:Thread-1:2013-01-14 15:37:37,254 got (1, 0)
ERROR:Thread-1:2013-01-14 15:37:37,254 work(1, 0) failed
Traceback (most recent call last):
File "prog.py", line 11, in work
return x / y, None
ZeroDivisionError: integer division or modulo by zero
DEBUG:Thread-1:2013-01-14 15:37:37,254 got (0, 1)
DEBUG:Thread-3:2013-01-14 15:37:37,253 got (10, 4)
DEBUG:Thread-2:2013-01-14 15:37:37,253 got (78, 5)
sum=78.0
Upvotes: 1
Reputation: 3123
Threads don't stop unless you tell them to stop.
My recommendation is that you add a stop
variable into your Thread
subclass, and check whether this variable is True
or not in your run loop (instead of while 1:
).
An example:
class worker(threading.Thread):
def __init__(self):
self._stop = False
def stop(self):
self._stop = True
def run(self):
# run until stopped
while not self._stop:
# do work
Then when your program is quitting (for whatever reason) you have to make sure to call the stop
method on all your working threads.
About your second question, doesn't adding a name
variable to your Thread
subclass work for you?
Upvotes: 0