Reputation: 61
thats my first question on stackoverflow. I was mostly able to find here what i need to know. Thanks a lot for this btw.
However. If i try to kill my ProcessPoolExecutor it will just work through the whole queue which is generated (.. i think so?). Is there any simple way to immediately clean the queue of a Processpoolexecutor?
from concurrent.futures import ProcessPoolExecutor
from time import sleep
from random import randint
def something_fancy():
sleep(randint(0, 5))
return 'im back!'
class Work:
def __init__(self):
self.exe = ProcessPoolExecutor(4)
def start_procs(self):
for i in range(300):
t = self.exe.submit(something_fancy)
t.add_done_callback(self.done)
def done(self, f):
print f.result()
def kill(self):
self.exe.shutdown()
if __name__ == '__main__':
work_obj = Work()
work_obj.start_procs()
sleep(5)
work_obj.kill()
So what i want to do is generate a Queue by 300 which gets worked out by 4 processes. After 5 seconds it should just quit.
I need to use processes because of gil btw.
Upvotes: 3
Views: 6560
Reputation: 51643
Using shutdown(wait=False)
it will return faster. The default for wait
is True
Otherwise it also provides a .Cancel()
which returns False if not cancelable.
It will still finish all processing futures though:
If
wait
isTrue
then this method will not return until all the pending futures are done executing and the resources associated with the executor have been freed.If
wait
isFalse
then this method will return immediately and the resources associated with the executor will be freed when all pending futures are done executing. Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.
If you have a fixed amount of time, you should provide a timeout:
map(func, *iterables, timeout=None, chunksize=1)
which can be a float or int, specified in seconds
Upvotes: 1
Reputation: 61
Thanks Patrick
With the hint i was able to cancel every process by adding the Futures to a list and adjusting the queue size manually. Without it there are still to much processes which are getting launched.
Just seems like there is no api to adjust the queue size, pause the execution or delete the process queue.
However - the only way to realize this is to run the Main object in a Thread so the mainscript can kill it at any time. And im still trying to catch the "CancelledError".
Looks pretty "dirty" and not pythonic for me. I'll take any other suggestions. Thanks a lot.
from concurrent.futures import ProcessPoolExecutor, CancelledError
from time import sleep
from random import randint
from threading import Thread
def something_fancy():
sleep(randint(0, 5))
return 'im back!'
class Work:
def __init__(self):
self.exe = ProcessPoolExecutor(4)
self.futures = []
self.max_queue = 50
self.killed = False
def start_procs(self):
for i in range(200000):
while not self.killed:
if len(self.futures) <= self.max_queue:
t = self.exe.submit(something_fancy)
t.add_done_callback(self.done)
self.futures.append(t)
break
def done(self, f):
print f.result()
self.futures.remove(f)
def kill(self):
self.killed = True
for future in self.futures:
try:
future.cancel()
except CancelledError, e:
print e
if __name__ == '__main__':
work_obj = Work()
Thread(target=work_obj.start_procs).start()
sleep(5)
work_obj.kill()
edit
from concurrent.futures import ProcessPoolExecutor, CancelledError
from time import sleep
from random import randint
from threading import Thread
def something_fancy():
sleep(0.5)
return 'Hello World, Future was running!'
class Work:
def __init__(self):
cpu_usage = 4
self.exe = ProcessPoolExecutor(cpu_usage)
self.futures = []
self.max_queue = cpu_usage*3
self.stop = False
self.paused = False
def start_procs(self):
for i in range(200000):
while not self.stop:
if len(self.futures) <= self.max_queue:
if not self.paused:
t = self.exe.submit(something_fancy)
t.add_done_callback(self._done)
self.futures.append(t)
break
def _done(self, f):
print f.result()
self.futures.remove(f)
def pause(self):
self.paused = False if self.paused else True
def shutdown(self):
self.stop = True
for future in self.futures:
try:
future.cancel()
except CancelledError, e:
print e
if __name__ == '__main__':
work_obj = Work()
Thread(target=work_obj.start_procs).start()
print 'Started'
sleep(5)
work_obj.pause()
print 'Paused'
sleep(5)
work_obj.pause()
print 'Continue'
sleep(5)
work_obj.shutdown()
print 'Shutdown'
That works - still doesn't catches the CancelledError and still pretty dirty.
Upvotes: 0