Cannon
Cannon

Reputation: 2783

Terminating a thread : Python

In the below example, if you execute the program multiple times, it spawns a new thread each time with a new ID. 1. How do I terminate all the threads on task completion ? 2. How can I assign name/ID to the threads ?

import threading, Queue 

THREAD_LIMIT = 3                 
jobs = Queue.Queue(5)           # This sets up the queue object to use 5 slots 
singlelock = threading.Lock()   # This is a lock so threads don't print trough each other 

# list  
inputlist_Values = [ (5,5),(10,4),(78,5),(87,2),(65,4),(10,10),(65,2),(88,95),(44,55),(33,3) ] 

def DoWork(inputlist): 
    print "Inputlist received..."
    print inputlist 

    # Spawn the threads 
    print "Spawning the {0} threads.".format(THREAD_LIMIT) 
    for x in xrange(THREAD_LIMIT): 
        print "Thread {0} started.".format(x) 
        # This is the thread class that we instantiate. 
        worker().start() 

    # Put stuff in queue 
    print "Putting stuff in queue"
    for i in inputlist: 
        # Block if queue is full, and wait 5 seconds. After 5s raise Queue Full error. 
        try: 
            jobs.put(i, block=True, timeout=5) 
        except: 
            singlelock.acquire() 
            print "The queue is full !"
            singlelock.release() 

    # Wait for the threads to finish 
    singlelock.acquire()        # Acquire the lock so we can print 
    print "Waiting for threads to finish."
    singlelock.release()        # Release the lock 
    jobs.join()                 # This command waits for all threads to finish. 

class worker(threading.Thread): 
    def run(self): 
        # run forever 
        while 1: 
            # Try and get a job out of the queue 
            try: 
                job = jobs.get(True,1) 
                singlelock.acquire()        # Acquire the lock 
                print self
                print "Multiplication of {0} with {1} gives {2}".format(job[0],job[1],(job[0]*job[1]))         
                singlelock.release()        # Release the lock 
                # Let the queue know the job is finished. 
                jobs.task_done() 
            except: 
                break           # No more jobs in the queue 


def main():    
    DoWork(inputlist_Values)

Upvotes: 1

Views: 1282

Answers (2)

jfs
jfs

Reputation: 414205

How do I terminate all the threads on task completion?

You could put THREAD_LIMIT sentinel values (e.g., None) at the end of the queue and exit thread's run() method if a thread sees it.

On your main thread exit all non-daemon threads are joined so the program will keep running if any of the threads is alive. Daemon threads are terminated on your program exit.

How can I assign name/ID to the threads ?

You can assign name by passing it to the constructor or by changing .name directly.

Thread identifier .ident is a read-only property that is unique among alive threads. It maybe reused if one thread exits and another starts.


You could rewrite you code using multiprocessing.dummy.Pool that provides the same interface as multiprocessing.Pool but uses threads instead of processes:

#!/usr/bin/env python
import logging
from multiprocessing.dummy import Pool

debug = logging.getLogger(__name__).debug

def work(x_y):
    try:
        x, y = x_y # do some work here
        debug('got %r', x_y)
        return x / y, None
    except Exception as e:
        logging.getLogger(__name__).exception('work%r failed', x_y) 
        return None, e

def main():
    logging.basicConfig(level=logging.DEBUG,
        format="%(levelname)s:%(threadName)s:%(asctime)s %(message)s")

    inputlist = [ (5,5),(10,4),(78,5),(87,2),(65,4),(10,10), (1,0), (0,1) ]
    pool = Pool(3)
    s = 0.
    for result, error in pool.imap_unordered(work, inputlist):
        if error is None:
           s += result
    print("sum=%s" % (s,))
    pool.close()
    pool.join()

if __name__ == "__main__":
   main()

Output

DEBUG:Thread-1:2013-01-14 15:37:37,253 got (5, 5)
DEBUG:Thread-1:2013-01-14 15:37:37,253 got (87, 2)
DEBUG:Thread-1:2013-01-14 15:37:37,253 got (65, 4)
DEBUG:Thread-1:2013-01-14 15:37:37,254 got (10, 10)
DEBUG:Thread-1:2013-01-14 15:37:37,254 got (1, 0)
ERROR:Thread-1:2013-01-14 15:37:37,254 work(1, 0) failed
Traceback (most recent call last):
  File "prog.py", line 11, in work
    return x / y, None
ZeroDivisionError: integer division or modulo by zero
DEBUG:Thread-1:2013-01-14 15:37:37,254 got (0, 1)
DEBUG:Thread-3:2013-01-14 15:37:37,253 got (10, 4)
DEBUG:Thread-2:2013-01-14 15:37:37,253 got (78, 5)
sum=78.0

Upvotes: 1

asermax
asermax

Reputation: 3123

Threads don't stop unless you tell them to stop.

My recommendation is that you add a stop variable into your Thread subclass, and check whether this variable is True or not in your run loop (instead of while 1:).

An example:

class worker(threading.Thread): 
    def __init__(self):
         self._stop = False

    def stop(self):
         self._stop = True

    def run(self): 
        # run until stopped 
        while not self._stop:                 
            # do work

Then when your program is quitting (for whatever reason) you have to make sure to call the stop method on all your working threads.

About your second question, doesn't adding a name variable to your Thread subclass work for you?

Upvotes: 0

Related Questions