Reputation: 41
I'm using the ThreadPoolExecutor to do multiple tasks at the same time and this question is about gracefully cancelling the outstanding tasks. It seems to me that when I cancel outstanding (i.e. not yet running) futures, concurrent.futures.as_completed never yields the cancelled futures. Even though its documentation states "[as_completed] yields futures as they complete (finished or cancelled futures)."
Example:
import concurrent.futures
import time
def do_task(t):
time.sleep(t)
print('task',t,'finished')
return t
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
future_to_t = {executor.submit(do_task, t): t for t in range(1,6)}
for future in concurrent.futures.as_completed(future_to_t):
print('tasks done?',[f.done() for f in future_to_t])
t = future_to_t[future]
if t == 1:
executor.shutdown(wait=False, cancel_futures=True)
print('outstanding tasks are now cancelled')
print('all tasks done')
print('bye')
In the example I put 5 tasks in the pipeline and cancel them after the first task completes. Since tasks 2, 3 and 4 already started (3 workers) I expect task 5 to be cancelled and yielded by as_completed - but that never happens. The above code prints:
task 1 finished
tasks done? [True, False, False, False, False]
outstanding tasks are now cancelled
task 2 finished
tasks done? [True, True, False, False, True]
task 3 finished
tasks done? [True, True, True, False, True]
task 4 finished
tasks done? [True, True, True, True, True]
It then hangs and never reaches all tasks done
.
I can get around this problem by checking the states of the futures and breaking the loop once all futures are completed - but that's what I expect from as_completed.
What am I doing wrong? Any insights? Thanks!
Follow-up:
ThreadPoolExecutor.shutdown()
to cancel the futures their state changes to CANCELLED
, but not to CANCELLED_AND_NOTIFIED
as expected by concurrent.futures.as_completed()
.concurrent.futures.as_completed()
works as expected if I cancel the futures myself.for f in future_to_t: f.cancel()
instead of executor.shutdown(wait=False, cancel_futures=True)
Upvotes: 4
Views: 1976
Reputation: 63
I was facing the same problem and wanted to use executor.shutdown()
as well. If you don't mind going through the futures sequentially, rather than using as_completed()
and getting futures by time of completion, you can use a for
loop and check for status instead
for future in futures:
if future.cancelled():
pass
result = future.result()
Upvotes: 0