Reputation: 8533
I am getting a RuntimeError sometimes (not more than 1% of the time, I would say) when using ThreadPool
from multiprocessing.pool
in Python.
I have read that this happens if one tries to open hundreds of threads. In my case, it is supposed to be maximum 4 threads, so I am a bit confused why this is happening.
I have been using previously in the exact same environment ThreadPool
with 3 threads and never got an error.
My code is:
import time
from multiprocessing.pool import ThreadPool
while True:
qty_fetched = 6
time.sleep(random_secs(0.5))
pending_updates = fetch_pending_updates(qty_fetched) #list of dicts
if pending_updates:
prio = pending_updates[0]['prio'] #variable number between 0 and 4 (edited from original question)
if prio > 3:
qty_threads = 1
elif prio == 0 or prio == 1:
qty_threads = 4
else:
qty_threads = 3
pool = ThreadPool(qty_threads)
pool.map(self.run_update_NEW, pending_updates) #a list of 6 dicts will be given to the pool of 1, 3 or 4 threads
else:
time.sleep(2)
And the Traceback:
...
pool = ThreadPool(qty_threads)
File "/app/.heroku/python/lib/python3.6/multiprocessing/pool.py", line 789, in __init__
Pool.__init__(self, processes, initializer, initargs)
File "/app/.heroku/python/lib/python3.6/multiprocessing/pool.py", line 192, in __init__
self._task_handler.start()
File "/app/.heroku/python/lib/python3.6/threading.py", line 846, in start
_start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread
Any ideas of what problem is?
From here I got to know about ThreadPoolExecutor
.
I have decided to give it a try:
import time
from concurrent.futures import ThreadPoolExecutor
while True:
qty_fetched = 6
time.sleep(random_secs(0.5))
pending_updates = fetch_pending_updates(qty_fetched) #list of dicts
if pending_updates:
prio = 2 #some variable number between 0 and 4
if prio > 3:
qty_threads = 1
elif prio == 0 or prio == 1:
qty_threads = 4
else:
qty_threads = 3
#the following lines changed
with ThreadPoolExecutor(max_workers=qty_threads) as e:
for pu in pending_updates:
e.submit(self.run_update_NEW, pu)
else:
time.sleep(2)
I will keep the post updated explaining if this works.
Upvotes: 1
Views: 5941
Reputation: 12205
An issue I can see in your code is that you have an infinite while True
loop, where you create your pool, but you never actually close it. You keep now creating pools, but as you never close and join the pool, the "old" threads will most likely just hang there, and a fraction of a second later you create more of them. My guess is you just eventually exhaust your resources and hit a process or kernel limit somewhere.
I would move pool creation outside of the while loop and just keep using the same pool in your loop. This is the whole idea of a pool - to have processes or threads waiting for work to appear, removing process/thread creation overhead when launching repetitive tasks.
If there is a reason to relaunch the pool (I cannot figure out what that could be - if you need to renew your workers occasionally, you could use maxtasksperchild
in your pool declaration), then at least close the old pool properly as you will not be feeding any more work to it.
Upvotes: 2