bbh
bbh

Reputation: 41

Does concurrent.futures.as_completed yield for cancelled futures?

I'm using the ThreadPoolExecutor to do multiple tasks at the same time and this question is about gracefully cancelling the outstanding tasks. It seems to me that when I cancel outstanding (i.e. not yet running) futures, concurrent.futures.as_completed never yields the cancelled futures. Even though its documentation states "[as_completed] yields futures as they complete (finished or cancelled futures)."

Example:

import concurrent.futures
import time

def do_task(t):
    time.sleep(t)
    print('task',t,'finished')
    return t

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    future_to_t = {executor.submit(do_task, t): t for t in range(1,6)}
    for future in concurrent.futures.as_completed(future_to_t):
        print('tasks done?',[f.done() for f in future_to_t])
        t = future_to_t[future]
        if t == 1:
            executor.shutdown(wait=False, cancel_futures=True)
            print('outstanding tasks are now cancelled')
    print('all tasks done')
print('bye')

In the example I put 5 tasks in the pipeline and cancel them after the first task completes. Since tasks 2, 3 and 4 already started (3 workers) I expect task 5 to be cancelled and yielded by as_completed - but that never happens. The above code prints:

task 1 finished
tasks done? [True, False, False, False, False]
outstanding tasks are now cancelled
task 2 finished
tasks done? [True, True, False, False, True]
task 3 finished
tasks done? [True, True, True, False, True]
task 4 finished
tasks done? [True, True, True, True, True]

It then hangs and never reaches all tasks done.

I can get around this problem by checking the states of the futures and breaking the loop once all futures are completed - but that's what I expect from as_completed.

What am I doing wrong? Any insights? Thanks!

Follow-up:

Upvotes: 4

Views: 1976

Answers (1)

n2my
n2my

Reputation: 63

I was facing the same problem and wanted to use executor.shutdown() as well. If you don't mind going through the futures sequentially, rather than using as_completed() and getting futures by time of completion, you can use a for loop and check for status instead

for future in futures:
    if future.cancelled():
        pass
    result = future.result()

Upvotes: 0

Related Questions