Reputation: 1227
I have a script that executes a certain function by multi-threading. Now, it is of interest to have only as much threads running parallel as having CPU-cores. Now the current code (1:) using the threading.thread statement creates 1000 threads and runs them all simultaneously. I want to turn this into something that runs only a fixed number of threads at the same time (e.g., 8) and puts the rest into a queue till a executing thread/cpu core is free for usage.
1:
import threading
nSim = 1000
def simulation(i):
print(str(threading.current_thread().getName()) + ': '+ str(i))
if __name__ == '__main__':
threads = [threading.Thread(target=simulation,args=(i,)) for i in range(nSim)]
for t in threads:
t.start()
for t in threads:
t.join()
Q1: Is code 2: doing what I described? (multithreading with a max number of threads running simultaneously) Is it correct? (I think so but I'm not 100% sure)
Q2: Now the code initiates 1000 threads at the same time and executes them on 8 threads. Is there a way to only initiate a new thread when a executing thread/cpu core is free for usage (in order that I don't have 990 threadcalls waiting from the beginning to be executed when possible?
Q3: Is there a way to track which cpu-core executed which thread? Just to proof that the code is doing what it should do.
2:
import threading
import multiprocessing
print(multiprocessing.cpu_count())
from concurrent.futures import ThreadPoolExecutor
nSim = 1000
def simulation(i):
print(str(threading.current_thread().getName()) + ': '+ str(i))
if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=8) as executor:
for i in range (nSim):
res = executor.submit(simulation, i)
print(res.result())
Upvotes: 0
Views: 874
Reputation: 10959
A1: No, your code submits a task, receives a Future
in res
and then calls result
which waits for the result. Only after previous task was done a new task is given to a thread. Only one of the worker threads is really working at a time.
Take a look at ThreadPool.map
(actually Pool.map
) instead of submit
to distribute tasks among the workers.
A2: Only 8 threads (the number of workers) are used here at most. If using map
the input data of the 1000 tasks may be stored (needs memory) but no additional threads are created.
A3: Not that I know of. A thread is not bound to a core, it may switch between them fast.
Upvotes: 1
Reputation: 3836
A1: In order to limit number of threads which can simultaneously have access to some resource, you can use threading.Semaphore Actually 1000 threads will not give you tremendous speed boost, recomended number of threads per process is mp.cpu_count()*1 or mp.cpu_count()*2 in some articles. Also note that Threads are good for IO operations in python, but not for computing due to GIL.
A2. Why do you need so many threads if you want to run only 8 of them simultaneously? Create just 8 threads and then supply them with Tasks when the Tasks are ready, to do so you need to use queue.Queue() which is thread safe. But in your concrete example you can do just the following to run your test 250 times per thread using while inside simulation function, by the way you do not need Semaphore in the case.
A3. When we are talking about multithreading, you have one process with multiple threads.
import threading
import time
import multiprocessing as mpdef simulation(i, _s):
# s is threading.Semaphore()
with _s:
print(str(threading.current_thread().getName()) + ': ' + str(i))
time.sleep(3)if name == 'main':
print("Cores number: {}".format(mp.cpu_count()))
# recommended number of threading is mp.cpu_count()*1 or mp.cpu_count()*2 in some articles
nSim = 25s = threading.Semaphore(4) # max number of threads which can work simultaneously with resource is 4 threads = [threading.Thread(target=simulation, args=(i, s, )) for i in range(nSim)] for t in threads: t.start() # just to prove that all threads are active in the start and then their number decreases when the work is done for i in range(6): print("Active threads number {}".format(threading.active_count())) time.sleep(3)
Upvotes: 1