Reputation: 65
What I wish to do is Move on... after the first True, not caring about not yet finished I/O bound tasks. In below case two() is first and only True so the program needs to execute like this:
Second
Move on..
NOT:
Second
First
Third
Move on...
import concurrent.futures
import time
def one():
time.sleep(2)
print('First')
return False
def two():
time.sleep(1)
print('Second')
return True
def three():
time.sleep(4)
print('Third')
return False
tasks = [one, two, three]
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
for t in range(len(tasks)):
executor.submit(tasks[t])
print('Move on...')
Upvotes: 0
Views: 1256
Reputation: 154996
A with
statement is not what you want here because it waits for all submitted jobs to finish. You need to submit the tasks, as you already do, but then call as_completed
to wait for the first task that returns true (and no longer):
executor = concurrent.futures.ThreadPoolExecutor()
futures = [executor.submit(t) for t in tasks]
for f in concurrent.futures.as_completed(futures):
if f.result():
break
print('Move on...')
Upvotes: 1
Reputation: 44138
The problem with concurrent.futures.ThreadPoolExecutor
is that once tasks are submitted, they will run to completion so the program will print 'Move on...' but if there is in fact nothing else to do, the program will not terminate until functions one
and three
terminate and (and print their messages). So the program is guaranteed to run for at least 4 seconds.
Better to use the ThreadPool
class in the multiprocessing.pool
module which supports a terminate
method that will kill all outstanding tasks. The closest thing to an as_completed
method would probably be using the imap_unordered
method, but that requires a single worker function being used for all 3 tasks. But we can use apply_async
specifying a callback function to be invoked as results become available:
from multiprocessing.pool import ThreadPool
import time
from threading import Event
def one():
time.sleep(2)
print('First')
return False
def two():
time.sleep(1)
print('Second')
return True
def three():
time.sleep(4)
print('Third')
return False
def my_callback(result):
if result:
executor.terminate() # kill all other tasks
done_event.set()
tasks = [one, two, three]
executor = ThreadPool(3)
done_event = Event()
for t in tasks:
executor.apply_async(t, callback=my_callback)
done_event.wait()
print("Moving on ...")
Upvotes: 2