shams
shams

Reputation: 3508

Trouble speeding up application using Multiprocessing+Threads in Python

I have CPU bound application that I wish to speedup using multiprocessing+threading instead of using the pure threaded version. I wrote a simple application to check the performance of my approach and was surprised to see that the multiprocessing and multiprocessing+threaded versions were performing poorer than both the threaded and serial versions.

In my application I have a work queue that stores all the work. The threads then pop off one work item at a time and then process it either directly (threaded version) or by passing it into a process. The thread then needs to wait for the result to arrive before proceeding with the next iteration. The reason I need to pop off one work item at a time is because the work is dynamic(not the case in the prototype application code pasted below) and I cannot pre-partition the work and hand it off to each thread/process during creation.

I would like to know what I am doing wrong and how I could speedup my application.

Here is the execution time when I ran on a 16-core machine:

Version      : 2.7.2
Compiler     : GCC 4.1.2 20070925 (Red Hat 4.1.2-33)
Platform     : Linux-2.6.24-perfctr-x86_64-with-fedora-8-Werewolf
Processor    : x86_64
Num Threads/Processes: 8 ; Num Items: 16000
mainMultiprocessAndThreaded exec time: 3505.97214699  ms
mainPureMultiprocessing exec time: 2241.89805984  ms
mainPureThreaded exec time: 309.767007828  ms
mainSerial exec time: 52.3412227631  ms
Terminating

and here is the code I used:

import threading
import multiprocessing
import time
import platform

class ConcurrentQueue:
    def __init__(self):
        self.data = []
        self.lock = threading.Lock()

    def push(self, item):
        self.lock.acquire()
        try:
            self.data.append(item)
        finally:
            self.lock.release()
        return

    def pop(self):
        self.lock.acquire()
        result = None
        try:
            length = len(self.data)
            if length > 0:
                result = self.data.pop()
        finally:
            self.lock.release()
        return result

    def isEmpty(self, item):
        self.lock.acquire()
        result = 0
        try:
            result = len(self.data)
        finally:
            self.lock.release()
        return result != 0


def timeFunc(passedFunc):
    def wrapperFunc(*arg):
        startTime = time.time()
        result = passedFunc(*arg)
        endTime = time.time()
        elapsedTime = (endTime - startTime) * 1000
        print passedFunc.__name__, 'exec time:', elapsedTime, " ms"
        return result
    return wrapperFunc

def checkPrime(candidate):
    # dummy process to do some work
    for k in xrange(3, candidate, 2):
        if candidate % k:
            return False
    return True

def fillQueueWithWork(itemQueue, numItems):
    for item in xrange(numItems, 2 * numItems):
        itemQueue.push(item)


@timeFunc
def mainSerial(numItems):
    jobQueue = ConcurrentQueue()
    fillQueueWithWork(jobQueue, numItems)

    while True:
        dataItem = jobQueue.pop()
        if dataItem is None:
            break
        # do work with dataItem
        result = checkPrime(dataItem)
    return

# Start: Implement a pure threaded version
def pureThreadFunc(jobQueue):
    curThread = threading.currentThread()
    while True:
        dataItem = jobQueue.pop()
        if dataItem is None:
            break
        # do work with dataItem
        result = checkPrime(dataItem)
    return

@timeFunc
def mainPureThreaded(numThreads, numItems):
    jobQueue = ConcurrentQueue()
    fillQueueWithWork(jobQueue, numItems)

    workers = []
    for index in xrange(numThreads):
        loopName = "Thread-" + str(index)
        loopThread = threading.Thread(target=pureThreadFunc, name=loopName, args=(jobQueue, ))
        loopThread.start()
        workers.append(loopThread)

    for worker in workers:
        worker.join()

    return
# End: Implement a pure threaded version

# Start: Implement a pure multiprocessing version
def pureMultiprocessingFunc(jobQueue, resultQueue):
    while True:
        dataItem = jobQueue.get()
        if dataItem is None:
            break
        # do work with dataItem
        result = checkPrime(dataItem)
        resultQueue.put_nowait(result)
    return

@timeFunc
def mainPureMultiprocessing(numProcesses, numItems):
    jobQueue = ConcurrentQueue()
    fillQueueWithWork(jobQueue, numItems)

    workers = []
    queueSize = (numItems/numProcesses) + 10
    for index in xrange(numProcesses):
        jobs = multiprocessing.Queue(queueSize)
        results = multiprocessing.Queue(queueSize)
        loopProcess = multiprocessing.Process(target=pureMultiprocessingFunc, args=(jobs, results, ))
        loopProcess.start()
        workers.append((loopProcess, jobs, results))

    processIndex = 0
    while True:
        dataItem = jobQueue.pop()
        if dataItem is None:
            break
        workers[processIndex][1].put_nowait(dataItem)

        processIndex += 1
        if numProcesses == processIndex:
            processIndex = 0

    for worker in workers:
        worker[1].put_nowait(None)

    for worker in workers:
        worker[0].join()

    return
# End: Implement a pure multiprocessing version

# Start: Implement a threaded+multiprocessing version
def mpFunc(processName, jobQueue, resultQueue):
    while True:
        dataItem = jobQueue.get()
        if dataItem is None:
            break
        result = checkPrime(dataItem)
        resultQueue.put_nowait(result)
    return

def mpThreadFunc(jobQueue):
    curThread = threading.currentThread()
    threadName = curThread.getName()

    jobs = multiprocessing.Queue()
    results = multiprocessing.Queue()

    myProcessName = "Process-" + threadName
    myProcess = multiprocessing.Process(target=mpFunc, args=(myProcessName, jobs, results, ))
    myProcess.start()

    while True:
        dataItem = jobQueue.pop()
        # put item to allow process to start
        jobs.put_nowait(dataItem)
        # terminate loop if work queue is empty
        if dataItem is None:
            break
        # wait to get result from process
        result = results.get()
        # do something with result
    return

@timeFunc
def mainMultiprocessAndThreaded(numThreads, numItems):
    jobQueue = ConcurrentQueue()
    fillQueueWithWork(jobQueue, numItems)

    workers = []
    for index in xrange(numThreads):
        loopName = "Thread-" + str(index)
        loopThread = threading.Thread(target=mpThreadFunc, name=loopName, args=(jobQueue, ))
        loopThread.start()
        workers.append(loopThread)

    for worker in workers:
        worker.join()

    return
# End: Implement a threaded+multiprocessing version

if __name__ == '__main__':

    print 'Version      :', platform.python_version()
    print 'Compiler     :', platform.python_compiler()
    print 'Platform     :', platform.platform()
    print 'Processor    :', platform.processor()

    numThreads = 8
    numItems = 16000 #200000

    print "Num Threads/Processes:", numThreads, "; Num Items:", numItems

    mainMultiprocessAndThreaded(numThreads, numItems)
    mainPureMultiprocessing(numThreads, numItems)
    mainPureThreaded(numThreads, numItems)
    mainSerial(numItems)

    print "Terminating"

Edit: One of my guesses for the slowness is that the Queue.put() are busy waiting instead of relinquishing the GIL. If so, any suggestions on an alternate data structure I should be using?

Upvotes: 2

Views: 834

Answers (2)

Cory Dolphin
Cory Dolphin

Reputation: 2670

It seems that your function is not computationally intensive enough to outweigh the overheads of multiprocessing. (Note that in Python, MultiThreading does not increase your computational resources due to GIL).

Your function (checkPrime) is not actually checking for primality, rather it is returning very quickly, replacing it with a simple (and naive) prime checker, the result is as expected.

However, look at Use Python pool.map to have multiple processes perform operations on a list to see an easy use of multiprocessing. Note that there are builtin types to perform the task of your Queue, such as the Queue, See http://docs.python.org/library/multiprocessing.html#multiprocessing-managers

def checkPrime(candidate):
    # dummy process to do some work
    for k in xrange(3, candidate):
        if not candidate % k:
            return False
    return True

and an example 'speedy' implementation:

@timeFunc
def speedy(numThreads,numItems):
    pool = multiprocessing.Pool(numThreads) #note the default will use the optimal number of workers

    for i in xrange(numItems, 2 * numItems):
        pool.apply_async(checkPrime,i)
    pool.close()
    pool.join()

Which is nearly twice as fast!

wdolphin@Cory-linuxlaptop:~$ python test.py 
Version      : 2.6.6
Compiler     : GCC 4.4.5
Platform     : Linux-2.6.35-32-generic-x86_64-with-Ubuntu-10.10-maverick
Processor    : 
Num Threads/Processes: 8 ; Num Items: 16000
mainSerial exec time: 5555.76992035  ms
mainMultiprocessAndThreaded exec time: 4721.43602371  ms
mainPureMultiprocessing exec time: 4440.83094597  ms
mainPureThreaded exec time: 10829.3449879  ms
speedy exec time: 1898.72503281  ms

Upvotes: 2

DRH
DRH

Reputation: 8356

It looks like the computational cost of each item isn't outweighing the overhead associated with dispatching the work to another thread/process. For example, here are the results I'm seeing when running your test application on my machine (very similar to your results):

Version      : 2.7.1
Compiler     : MSC v.1500 32 bit (Intel)
Platform     : Windows-7-6.1.7601-SP1
Processor    : Intel64 Family 6 Model 30 Stepping 5, GenuineIntel
Num Threads/Processes: 8 ; Num Items: 16000
mainMultiprocessAndThreaded exec time: 1134.00006294  ms
mainPureMultiprocessing exec time: 917.000055313  ms
mainPureThreaded exec time: 111.000061035  ms
mainSerial exec time: 41.0001277924  ms
Terminating

If I modify the work that's being performed to something that's more computationally expensive, for example:

def checkPrime(candidate):
    i = 0;
    for k in xrange(1,10000):
        i += k
    return i < 5000

Then I see results that are more in line with what I think you would expect:

Version      : 2.7.1
Compiler     : MSC v.1500 32 bit (Intel)
Platform     : Windows-7-6.1.7601-SP1
Processor    : Intel64 Family 6 Model 30 Stepping 5, GenuineIntel
Num Threads/Processes: 8 ; Num Items: 16000
mainMultiprocessAndThreaded exec time: 2190.99998474  ms
mainPureMultiprocessing exec time: 2154.99997139  ms
mainPureThreaded exec time: 16170.0000763  ms
mainSerial exec time: 9143.00012589  ms
Terminating

You may also want to take a look at multiprocessing.Pool. It provides a similar model to what you're describing (multiple worker processes pulling jobs from a common queue). For your example, an implementation may look something like:

@timeFunc
def mainPool(numThreads, numItems):
    jobQueue = ConcurrentQueue()
    fillQueueWithWork(jobQueue, numItems)

    pool = multiprocessing.Pool(processes=numThreads)
    results = []
    while True:
        dataItem = jobQueue.pop()
        if dataItem == None:
            break
        results.append(pool.apply_async(checkPrime, dataItem))

    pool.close()
    pool.join()

On my machine, with the alternative checkPrime implementation, I'm seeing a result of:

Version      : 2.7.1
Compiler     : MSC v.1500 32 bit (Intel)
Platform     : Windows-7-6.1.7601-SP1
Processor    : Intel64 Family 6 Model 30 Stepping 5, GenuineIntel
Num Threads/Processes: 8 ; Num Items: 1600
mainPool exec time: 1530.99989891  ms
Terminating

Since the multiprocessing.Pool already provides safe access for inserting work, you likely could eliminate your ConcurrentQueue and insert your dynamic work directly to the Pool.

Upvotes: 5

Related Questions