Vibhor Kalra
Vibhor Kalra

Reputation: 71

Multi-threading in Python

I am facing some issues while implementing multi-threading in python. The issue is very specific to my use case. Having gone through numerous posts on the same, I deployed the most widely suggested/used method for doing so.

I start by defining my thread class as follows.

class myThread(Thread):
    def __init__(self, graphobj, q):
        Thread.__init__(self)
        self.graphobj = graphobj
        self.q = q
    def run(self):
        improcess(self.graphobj, self.q)

Post which I define my function that does all the processing required.

def improcess(graphobj, q):
    while not exitFlag:
        queueLock.acquire()
        if not q.empty():
            photo_id = q.get()
            queueLock.release()
            # Complete processing
        else:
            queueLock.release()

Now comes the part where I am stuck. I am able to run the below mentioned code exactly as it is without any issues. However if I try and wrap the same in a function as such it breaks down.

def train_control(graphobj, photo_ids):
    workQueue = Queue(len(photo_ids))
    for i in range(1,5):
        thread = myThread(graphobj=graphobj, q=workQueue)
        thread.start()
        threads.append(thread)
    queueLock.acquire()
    for photo_id in photo_ids:
        workQueue.put(photo_id)
    queueLock.release()
    while not workQueue.empty():
        pass
    exitFlag = 1
    for t in threads:
        t.join()

By breaking down I mean that the threads complete their work but they don't stop waiting i.e. the exitFlag is never set to 1. I am unsure as to how to make this work.

Unfortunately the design of our systems is such that this piece of codes needs to be wrapped in a function which can be invoked by another module, so pulling it out is not really an option.

Looking forward to hearing from experts on this. Thanks in advance.

Edit : Forgot to mention this in the first draft. I globally initialize exitFlag and set its value to 0.

Below is the minimum, verifiable code snippet that I created to capture this problem:

import threading
import Queue

globvar01 = 5
globvar02 = 7
exitFlag = 0

globlist = []
threads = []

queueLock = threading.Lock()
workQueue = Queue.Queue(16)

class myThread(threading.Thread):
    def __init__(self, threadID, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.q = q
    def run(self):
        print "Starting thread " + str(self.threadID)
        myfunc(self.threadID, self.q)
        print "Exiting thread " + str(self.threadID)

def myfunc(threadID, q):
    while not exitFlag:
        queueLock.acquire()
        if not workQueue.empty():
            thoughtnum = q.get()
            queueLock.release()
            print "Processing thread " + str(threadID)
            if (thoughtnum < globvar01):
                globlist.append([1,2,3])
            elif (thoughtnum < globvar02):
                globlist.append([2,3,4])
        else:
            queueLock.release()

def controlfunc():
    for i in range(1,5):
        thread = myThread(i, workQueue)
        thread.start()
        threads.append(thread)
    queueLock.acquire()
    for i in range(1,11):
        workQueue.put(i)
    queueLock.release()
    # Wait for queue to empty
    while not workQueue.empty():
        pass
    exitFlag = 1
    # Wait for all threads to complete
    for t in threads:
        t.join()

print "Starting main thread"

controlfunc()

print "Exiting Main Thread"

Upvotes: 2

Views: 181

Answers (2)

Mark Tolonen
Mark Tolonen

Reputation: 177674

From your MCVE, the only thing missing is:

while not workQueue.empty():
    pass
global exitFlag  # Need this or `exitFlag` is a local variable only.
exitFlag = 1

You could eliminate the queueLock and the exitFlag, however, by using a sentinel value in the Queue to shut down the worker threads, and it eliminates the spin-waiting. Worker threads will sleep on a q.get() and the main thread won't have to spin-wait for an empty queue:

#!python2
from __future__ import print_function
import threading
import Queue

debug = 1
console = threading.Lock()

def tprint(*args,**kwargs):
    if debug:
        name = threading.current_thread().getName()
        with console:
            print('{}: '.format(name),end='')
            print(*args,**kwargs)

globvar01 = 5
globvar02 = 7

globlist = []
threads = []

workQueue = Queue.Queue(16)

class myThread(threading.Thread):
    def __init__(self, threadID, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.q = q
    def run(self):
        tprint("Starting thread " + str(self.threadID))
        myfunc(self.threadID, self.q)
        tprint("Exiting thread " + str(self.threadID))

def myfunc(threadID, q):
    while True:
        thoughtnum = q.get()
        tprint("Processing thread " + str(threadID))
        if thoughtnum is None:
            break
        elif thoughtnum < globvar01:
            globlist.append([1,2,3])
        elif thoughtnum < globvar02:
            globlist.append([2,3,4])

def controlfunc():
    for i in range(1,5):
        thread = myThread(i, workQueue)
        thread.start()
        threads.append(thread)
    for i in range(1,11):
        workQueue.put(i)
    # Wait for all threads to complete
    for t in threads:
        workQueue.put(None)
    for t in threads:
        t.join()

tprint("Starting main thread")

controlfunc()

tprint("Exiting Main Thread")

Output:

MainThread: Starting main thread
Thread-1: Starting thread 1
Thread-2: Starting thread 2
Thread-3: Starting thread 3
Thread-4: Starting thread 4
Thread-1: Processing thread 1
Thread-2: Processing thread 2
Thread-3: Processing thread 3
Thread-4: Processing thread 4
Thread-1: Processing thread 1
Thread-2: Processing thread 2
Thread-3: Processing thread 3
Thread-4: Processing thread 4
Thread-1: Processing thread 1
Thread-2: Processing thread 2
Thread-3: Processing thread 3
Thread-4: Processing thread 4
Thread-1: Processing thread 1
Thread-2: Processing thread 2
Thread-3: Exiting thread 3
Thread-4: Exiting thread 4
Thread-1: Exiting thread 1
Thread-2: Exiting thread 2
MainThread: Exiting Main Thread

Upvotes: 1

MrJLP
MrJLP

Reputation: 998

You need to make sure exitFlag is set to 0 (False) before spawning any threads otherwise in impprocess() they won't do anything and the queue will remain non-empty.

This problem could happen if you have exitFlag as a global and it's not cleared from a previous run.

Upvotes: 0

Related Questions