RamPrasadBismil
RamPrasadBismil

Reputation: 589

Understanding how to use Multiple threads calling different functions with Queue in python?

So, I have this basic example that I wrote where I use multiple threads and a single Queue calling different functions independently and doing certain tasks. Does this logic look right or is there any scope of improvement? I didnt use like a separate class for threads as showed in http://www.ibm.com/developerworks/aix/library/au-threadingpython/ as that would unneccesarily complicate the workflow I am trying to implement, wherein each thread calls a separate function and puts it in the same queue which I can use later to analyze the results.

from Queue import Queue
from threading import Thread

class Scheduler():
    def __init__(self):
        self.id=10

    def add(self,q,id):
        self.id+=id
        q.put('added %d' % self.id)
        q.task_done()

    def mul(self,q,id):
        self.id*=id
        q.put('multiplied : %d' % self.id)
        q.task_done()



if __name__=='__main__':
    id=5
    sch1=Scheduler()
    sch2=Scheduler()
    q= Queue()
    t1=Thread(target=sch1.add, args=(q,id,))
    t1.start()
    t2=Thread(target=sch2.mul, args=(q,id,))
    t2.start()
    print q.get()
    print q.get()
    q.join()

Upvotes: 0

Views: 1134

Answers (1)

dano
dano

Reputation: 94881

There are a couple of issues here:

First, incrementing self.id via += and *= is not atomic, so if you were to run multiple add and or mul methods concurrently on the same Scheduler object, self.id could end up being calculated incorrectly because of two or more instances stepping on each other. You can fix that by protecting the increment operations with a threading.Lock.

Second, you're misusing the Queue.task_done/Queue.join methods. The idea behind task_done and join is to have a producer thread putitems onto the Queue, and then, after its added all its work items to the Queue, call queue.join() to wait for all the work items to be processed by one or more consumers. The consumers call queue.get(), processs the work item, and then call queue.task_done() to signal that its done processing the item. You've got this a bit backwards - you're calling queue.put and queue.task_done from the same thread. The way you're using the Queue, it really doesn't make sense to use this pattern - you're just using the Queue to pass results back to the main thread. You might as well just do this:

from Queue import Queue
from threading import Thread

class Scheduler():
    def __init__(self):
        self.id=10

    def add(self,q,id):
        self.id+=id
        q.put('added %d' % self.id)

    def mul(self,q,id):
        self.id*=id
        q.put('multiplied : %d' % self.id)

if __name__=='__main__':
    sch1 = Scheduler()
    sch2 = Scheduler()
    q = Queue()
    t1 = Thread(target=sch1.add, args=(q,id,))
    t1.start()
    t2 = Thread(target=sch2.mul, args=(q,id,))
    t2.start()
    print q.get()
    print q.get()

Upvotes: 1

Related Questions