bigTree
bigTree

Reputation: 2173

Limiting used resources in multithreading

I have been parallelizing a code calling myfunc with threading.Thread as follows:

def myfunc(elt,other):
    subprocess.call("A matlab script that takes a while to execute")


allThreads = []

for elt in allElts:
    allThreads.append(threading.Thread(target=myfunc,args=(elt,other)))
for t in allThreads:
    t.start()
for t in allThreads:
    t.join()

Due to the important amount of data, I faced a memory issue: Some of my subscribe.call raised a memory issue and could not be allocated. To avoid this issue, I tried to limit the amount of threads executing simultaneously to 8. I changes the code above to the following:

someThreads = []
k = 0
for k in range(len(allElts)):
    if k%8 == 1:
        for t in someThreads:
            t.start()
        for t in someThreads:
            t.join()

        someThreads = []
        someThreads.append(threading.Thread(target=myfunc,args=(allElts[k],other)))

    else:
        someThreads.append(threading.Thread(target=myfunc,args=(allElts[k],other)))
    k += 1

This is supposed to create 8 threads maximum and execute them. However, the result from this piece of code is different from the one I got before and clearly wrong. What is wrong with it?

Upvotes: 1

Views: 106

Answers (1)

unutbu
unutbu

Reputation: 880547

The threads are not started until k%8 == 1, and then a new thread is added to a new someThreads, but is not started.

That means that at the end of the loop there will be at least one thread in someThreads that does not get started with a call to t.start().

Instead, use a multiprocessing ThreadPool:

import multiprocessing as mp
import multiprocessing.pool as mpool
pool = mpool.ThreadPool(8)

for elt in allElts:
    pool.apply_async(myfunc, args=(elt,other))    

pool.close()
pool.join()

Upvotes: 5

Related Questions