Reputation: 1476
I want to call two api's at same time using python ThreadPoolExecutor (code attached). If either of these two api call responds, i want to stop calling the other. Because for my use case one of the two apis will take long time to return response which i want to avoid calling.
def get_rest_api_response(url):
return requets.get(url)
import requests, os
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=4) as executor:
f1 = executor.submit(get_rest_api_response, url="REST_API_URL_1")
f2 = executor.submit(get_rest_api_response, url="REST_API_URL_2")
no_future_is_done = True
while(no_future_is_done):
if f1.done():
no_future_is_done = False
print("f1 is done")
output = f1.result()
print(f2.cancel()) ######------> Failing!
if f2.done():
no_future_is_done = False
print("f2 is done")
output = f2.result()
print(f1.cancel()) ######-------> Failing!
print(output)
I'm using future.cancel() but its failing and returning False. https://pd.codechef.com/docs/py/3.4.2/library/concurrent.futures.html#concurrent.futures.Future.cancel
Is there any other way i can achieve this?
Upvotes: 11
Views: 13680
Reputation: 1
For the specific situation mentioned in question, if your get_rest_api_response
function can block forever, you should set it a timeout first, or you have to kill the thread.
Similar to @Shahzod1011, but manage tasks and signals in class, more suitable if each task is complicated.
import concurrent.futures
import time
class MyTask:
def __init__(self, task_name):
self.task_name = task_name
self.if_run = True
def run(self):
while self.if_run:
print(f"Task '{self.task_name}' is running. '{self.if_run}'")
time.sleep(1)
class ThreadPoolManager:
def __init__(self):
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
self.task = {} # task_name - MyTask[task_name]
self.task_execution_pool = {} # task_name - future
def add_task(self, task_name):
self.task[task_name] = MyTask(task_name)
future = self.executor.submit(self.task[task_name].run)
self.task_execution_pool[task_name] = future
def stop_task(self, task_name):
if task_name in self.task_execution_pool:
self.task[task_name].if_run = False
self.task.pop(task_name)
print(f"stopping task'{task_name}'")
future = self.task_execution_pool.pop(task_name)
future.cancel()
if __name__ == "__main__":
thread_pool = ThreadPoolManager()
# Add tasks
thread_pool.add_task("Task1")
thread_pool.add_task("Task2")
time.sleep(5)
print("end of sleep")
# Stop tasks
thread_pool.stop_task("Task1")
thread_pool.stop_task("Task2")
thread_pool.executor.shutdown(wait=True)
You can modify MyTask
and add_task
in ThreadPoolManager
: one MyTask
, two run
function for each api.
Upvotes: 0
Reputation: 129
Although the ThreadPoolExecutor
does not provide a facility to stop running tasks, we can update our target task functions to stop running when a thread-safe flag is set. This can be implemented using threading.Event.
First, it requires that you first create a threading.Event
to control when running tasks should stop.
from threading import Event
...
# create an event to shut down all running tasks
event = Event()
This event can then be passed to each target task function as an argument.
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
# execute the task, passing the event
future = executor.submit(work, event)
Once the task is running, it can be stopped from the main thread by setting the flag on the Event
.
...
# stop running tasks via the event
event.set()
Each target task function must check the status of the event frequently, such as within each iteration of a loop within the task.
If the event is set, the task can then stop running, perhaps by returning immediately.
...
# check the status of the flag
if event.is_set():
return
If your target task function has some resources open, it may need to clean up before returning.
This approach to stop running tasks may require that you change the structure of your task so that you have a loop structure allowing you to check the value of the flag.
For example, if your task reads data from a file or a socket, you may need to change the read operation to be performed in blocks of data in a loop so that each iteration of the loop you can check the status of the flag.
# SuperFastPython.com
# example of stopping a running task in a thread pool
from time import sleep
from threading import Event
from concurrent.futures import ThreadPoolExecutor
# mock target task function
def work(event):
# pretend read data for a long time
for _ in range(10):
# pretend to read some data
sleep(1)
# check if the task should stop
if event.is_set():
return
# create an event used to stop running tasks
event = Event()
# create a thread pool
with ThreadPoolExecutor() as executor:
# execute one task
future = executor.submit(work, event)
# wait a moment
print('The task is running...')
sleep(2)
# cancel the task, just in case it is not yet running
future.cancel()
# stop the running task using the flag
print('Stopping the task...')
event.set()
# waiting for all tasks to complete
print('Waiting...')
print('All done.')
Running the example first creates a thread pool and schedules a task.
An event object is created and passed to the task where it is checked each iteration to see if it has been set and if so to bail out of the task.
The task starts executing as per normal for two seconds. First, we cancel all tasks in the pool, just in case it has not yet started executing.
We then set the event to trigger the running task to stop. The task checks the status of the event each second, and stops executing on the next iteration after the event has been set.
The task is running...
Stopping the task...
Waiting...
All done.
The original answer can be found here!
Upvotes: 11