Reputation: 3508
I have CPU bound application that I wish to speedup using multiprocessing+threading instead of using the pure threaded version. I wrote a simple application to check the performance of my approach and was surprised to see that the multiprocessing and multiprocessing+threaded versions were performing poorer than both the threaded and serial versions.
In my application I have a work queue that stores all the work. The threads then pop off one work item at a time and then process it either directly (threaded version) or by passing it into a process. The thread then needs to wait for the result to arrive before proceeding with the next iteration. The reason I need to pop off one work item at a time is because the work is dynamic(not the case in the prototype application code pasted below) and I cannot pre-partition the work and hand it off to each thread/process during creation.
I would like to know what I am doing wrong and how I could speedup my application.
Here is the execution time when I ran on a 16-core machine:
Version : 2.7.2
Compiler : GCC 4.1.2 20070925 (Red Hat 4.1.2-33)
Platform : Linux-2.6.24-perfctr-x86_64-with-fedora-8-Werewolf
Processor : x86_64
Num Threads/Processes: 8 ; Num Items: 16000
mainMultiprocessAndThreaded exec time: 3505.97214699 ms
mainPureMultiprocessing exec time: 2241.89805984 ms
mainPureThreaded exec time: 309.767007828 ms
mainSerial exec time: 52.3412227631 ms
Terminating
and here is the code I used:
import threading
import multiprocessing
import time
import platform
class ConcurrentQueue:
def __init__(self):
self.data = []
self.lock = threading.Lock()
def push(self, item):
self.lock.acquire()
try:
self.data.append(item)
finally:
self.lock.release()
return
def pop(self):
self.lock.acquire()
result = None
try:
length = len(self.data)
if length > 0:
result = self.data.pop()
finally:
self.lock.release()
return result
def isEmpty(self, item):
self.lock.acquire()
result = 0
try:
result = len(self.data)
finally:
self.lock.release()
return result != 0
def timeFunc(passedFunc):
def wrapperFunc(*arg):
startTime = time.time()
result = passedFunc(*arg)
endTime = time.time()
elapsedTime = (endTime - startTime) * 1000
print passedFunc.__name__, 'exec time:', elapsedTime, " ms"
return result
return wrapperFunc
def checkPrime(candidate):
# dummy process to do some work
for k in xrange(3, candidate, 2):
if candidate % k:
return False
return True
def fillQueueWithWork(itemQueue, numItems):
for item in xrange(numItems, 2 * numItems):
itemQueue.push(item)
@timeFunc
def mainSerial(numItems):
jobQueue = ConcurrentQueue()
fillQueueWithWork(jobQueue, numItems)
while True:
dataItem = jobQueue.pop()
if dataItem is None:
break
# do work with dataItem
result = checkPrime(dataItem)
return
# Start: Implement a pure threaded version
def pureThreadFunc(jobQueue):
curThread = threading.currentThread()
while True:
dataItem = jobQueue.pop()
if dataItem is None:
break
# do work with dataItem
result = checkPrime(dataItem)
return
@timeFunc
def mainPureThreaded(numThreads, numItems):
jobQueue = ConcurrentQueue()
fillQueueWithWork(jobQueue, numItems)
workers = []
for index in xrange(numThreads):
loopName = "Thread-" + str(index)
loopThread = threading.Thread(target=pureThreadFunc, name=loopName, args=(jobQueue, ))
loopThread.start()
workers.append(loopThread)
for worker in workers:
worker.join()
return
# End: Implement a pure threaded version
# Start: Implement a pure multiprocessing version
def pureMultiprocessingFunc(jobQueue, resultQueue):
while True:
dataItem = jobQueue.get()
if dataItem is None:
break
# do work with dataItem
result = checkPrime(dataItem)
resultQueue.put_nowait(result)
return
@timeFunc
def mainPureMultiprocessing(numProcesses, numItems):
jobQueue = ConcurrentQueue()
fillQueueWithWork(jobQueue, numItems)
workers = []
queueSize = (numItems/numProcesses) + 10
for index in xrange(numProcesses):
jobs = multiprocessing.Queue(queueSize)
results = multiprocessing.Queue(queueSize)
loopProcess = multiprocessing.Process(target=pureMultiprocessingFunc, args=(jobs, results, ))
loopProcess.start()
workers.append((loopProcess, jobs, results))
processIndex = 0
while True:
dataItem = jobQueue.pop()
if dataItem is None:
break
workers[processIndex][1].put_nowait(dataItem)
processIndex += 1
if numProcesses == processIndex:
processIndex = 0
for worker in workers:
worker[1].put_nowait(None)
for worker in workers:
worker[0].join()
return
# End: Implement a pure multiprocessing version
# Start: Implement a threaded+multiprocessing version
def mpFunc(processName, jobQueue, resultQueue):
while True:
dataItem = jobQueue.get()
if dataItem is None:
break
result = checkPrime(dataItem)
resultQueue.put_nowait(result)
return
def mpThreadFunc(jobQueue):
curThread = threading.currentThread()
threadName = curThread.getName()
jobs = multiprocessing.Queue()
results = multiprocessing.Queue()
myProcessName = "Process-" + threadName
myProcess = multiprocessing.Process(target=mpFunc, args=(myProcessName, jobs, results, ))
myProcess.start()
while True:
dataItem = jobQueue.pop()
# put item to allow process to start
jobs.put_nowait(dataItem)
# terminate loop if work queue is empty
if dataItem is None:
break
# wait to get result from process
result = results.get()
# do something with result
return
@timeFunc
def mainMultiprocessAndThreaded(numThreads, numItems):
jobQueue = ConcurrentQueue()
fillQueueWithWork(jobQueue, numItems)
workers = []
for index in xrange(numThreads):
loopName = "Thread-" + str(index)
loopThread = threading.Thread(target=mpThreadFunc, name=loopName, args=(jobQueue, ))
loopThread.start()
workers.append(loopThread)
for worker in workers:
worker.join()
return
# End: Implement a threaded+multiprocessing version
if __name__ == '__main__':
print 'Version :', platform.python_version()
print 'Compiler :', platform.python_compiler()
print 'Platform :', platform.platform()
print 'Processor :', platform.processor()
numThreads = 8
numItems = 16000 #200000
print "Num Threads/Processes:", numThreads, "; Num Items:", numItems
mainMultiprocessAndThreaded(numThreads, numItems)
mainPureMultiprocessing(numThreads, numItems)
mainPureThreaded(numThreads, numItems)
mainSerial(numItems)
print "Terminating"
Edit: One of my guesses for the slowness is that the Queue.put() are busy waiting instead of relinquishing the GIL. If so, any suggestions on an alternate data structure I should be using?
Upvotes: 2
Views: 834
Reputation: 2670
It seems that your function is not computationally intensive enough to outweigh the overheads of multiprocessing. (Note that in Python, MultiThreading does not increase your computational resources due to GIL).
Your function (checkPrime) is not actually checking for primality, rather it is returning very quickly, replacing it with a simple (and naive) prime checker, the result is as expected.
However, look at Use Python pool.map to have multiple processes perform operations on a list to see an easy use of multiprocessing. Note that there are builtin types to perform the task of your Queue, such as the Queue, See http://docs.python.org/library/multiprocessing.html#multiprocessing-managers
def checkPrime(candidate):
# dummy process to do some work
for k in xrange(3, candidate):
if not candidate % k:
return False
return True
and an example 'speedy' implementation:
@timeFunc
def speedy(numThreads,numItems):
pool = multiprocessing.Pool(numThreads) #note the default will use the optimal number of workers
for i in xrange(numItems, 2 * numItems):
pool.apply_async(checkPrime,i)
pool.close()
pool.join()
Which is nearly twice as fast!
wdolphin@Cory-linuxlaptop:~$ python test.py
Version : 2.6.6
Compiler : GCC 4.4.5
Platform : Linux-2.6.35-32-generic-x86_64-with-Ubuntu-10.10-maverick
Processor :
Num Threads/Processes: 8 ; Num Items: 16000
mainSerial exec time: 5555.76992035 ms
mainMultiprocessAndThreaded exec time: 4721.43602371 ms
mainPureMultiprocessing exec time: 4440.83094597 ms
mainPureThreaded exec time: 10829.3449879 ms
speedy exec time: 1898.72503281 ms
Upvotes: 2
Reputation: 8356
It looks like the computational cost of each item isn't outweighing the overhead associated with dispatching the work to another thread/process. For example, here are the results I'm seeing when running your test application on my machine (very similar to your results):
Version : 2.7.1
Compiler : MSC v.1500 32 bit (Intel)
Platform : Windows-7-6.1.7601-SP1
Processor : Intel64 Family 6 Model 30 Stepping 5, GenuineIntel
Num Threads/Processes: 8 ; Num Items: 16000
mainMultiprocessAndThreaded exec time: 1134.00006294 ms
mainPureMultiprocessing exec time: 917.000055313 ms
mainPureThreaded exec time: 111.000061035 ms
mainSerial exec time: 41.0001277924 ms
Terminating
If I modify the work that's being performed to something that's more computationally expensive, for example:
def checkPrime(candidate):
i = 0;
for k in xrange(1,10000):
i += k
return i < 5000
Then I see results that are more in line with what I think you would expect:
Version : 2.7.1
Compiler : MSC v.1500 32 bit (Intel)
Platform : Windows-7-6.1.7601-SP1
Processor : Intel64 Family 6 Model 30 Stepping 5, GenuineIntel
Num Threads/Processes: 8 ; Num Items: 16000
mainMultiprocessAndThreaded exec time: 2190.99998474 ms
mainPureMultiprocessing exec time: 2154.99997139 ms
mainPureThreaded exec time: 16170.0000763 ms
mainSerial exec time: 9143.00012589 ms
Terminating
You may also want to take a look at multiprocessing.Pool
. It provides a similar model to what you're describing (multiple worker processes pulling jobs from a common queue). For your example, an implementation may look something like:
@timeFunc
def mainPool(numThreads, numItems):
jobQueue = ConcurrentQueue()
fillQueueWithWork(jobQueue, numItems)
pool = multiprocessing.Pool(processes=numThreads)
results = []
while True:
dataItem = jobQueue.pop()
if dataItem == None:
break
results.append(pool.apply_async(checkPrime, dataItem))
pool.close()
pool.join()
On my machine, with the alternative checkPrime
implementation, I'm seeing a result of:
Version : 2.7.1
Compiler : MSC v.1500 32 bit (Intel)
Platform : Windows-7-6.1.7601-SP1
Processor : Intel64 Family 6 Model 30 Stepping 5, GenuineIntel
Num Threads/Processes: 8 ; Num Items: 1600
mainPool exec time: 1530.99989891 ms
Terminating
Since the multiprocessing.Pool
already provides safe access for inserting work, you likely could eliminate your ConcurrentQueue
and insert your dynamic work directly to the Pool
.
Upvotes: 5