Robb
Robb

Reputation: 65

Python concurrency first result ends waiting for not yet done results

What I wish to do is Move on... after the first True, not caring about not yet finished I/O bound tasks. In below case two() is first and only True so the program needs to execute like this:

Second
Move on..

NOT:

Second
First
Third
Move on...

import concurrent.futures
import time


def one():
    time.sleep(2)
    print('First')
    return False


def two():
    time.sleep(1)
    print('Second')
    return True


def three():
    time.sleep(4)
    print('Third')
    return False


tasks = [one, two, three]
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    for t in range(len(tasks)):
        executor.submit(tasks[t])

print('Move on...')

Upvotes: 0

Views: 1256

Answers (2)

user4815162342
user4815162342

Reputation: 154996

A with statement is not what you want here because it waits for all submitted jobs to finish. You need to submit the tasks, as you already do, but then call as_completed to wait for the first task that returns true (and no longer):

executor = concurrent.futures.ThreadPoolExecutor()
futures = [executor.submit(t) for t in tasks]
for f in concurrent.futures.as_completed(futures):
    if f.result():
        break
print('Move on...')

Upvotes: 1

Booboo
Booboo

Reputation: 44138

The problem with concurrent.futures.ThreadPoolExecutor is that once tasks are submitted, they will run to completion so the program will print 'Move on...' but if there is in fact nothing else to do, the program will not terminate until functions one and three terminate and (and print their messages). So the program is guaranteed to run for at least 4 seconds.

Better to use the ThreadPool class in the multiprocessing.pool module which supports a terminate method that will kill all outstanding tasks. The closest thing to an as_completed method would probably be using the imap_unordered method, but that requires a single worker function being used for all 3 tasks. But we can use apply_async specifying a callback function to be invoked as results become available:

from multiprocessing.pool import ThreadPool
import time
from threading import Event

def one():
    time.sleep(2)
    print('First')
    return False


def two():
    time.sleep(1)
    print('Second')
    return True


def three():
    time.sleep(4)
    print('Third')
    return False

def my_callback(result):
    if result:
        executor.terminate() # kill all other tasks
        done_event.set()

tasks = [one, two, three]
executor = ThreadPool(3)
done_event = Event()
for t in tasks:
    executor.apply_async(t, callback=my_callback)
done_event.wait()
print("Moving on ...")

Upvotes: 2

Related Questions