Reputation: 3177
I have a project in Python 3.5 without any usage of asynchronous features. I have to implement the folowing logic:
def should_return_in_3_sec(some_serious_job, arguments, finished_callback):
# Start some_serious_job(*arguments) in a task
# if it finishes within 3 sec:
# return result immediately
# otherwise return None, but do not terminate task.
# If the task finishes in 1 minute:
# call finished_callback(result)
# else:
# call finished_callback(None)
pass
The function should_return_in_3_sec()
should remain synchronous, but it is up to me to write any new asynchronous code (including some_serious_job()
).
What is the most elegant and pythonic way to do it?
Upvotes: 1
Views: 1324
Reputation: 30482
The threading
module has some simple timeout options, see Thread.join(timeout)
for example.
If you do choose to use asyncio, below is a a partial solution to address some of your needs:
import asyncio
import time
async def late_response(task, flag, timeout, callback):
done, pending = await asyncio.wait([task], timeout=timeout)
callback(done.pop().result() if done else None) # will raise an exception if some_serious_job failed
flag[0] = True # signal some_serious_job to stop
return await task
async def launch_job(loop, some_serious_job, arguments, finished_callback,
timeout_1=3, timeout_2=5):
flag = [False]
task = loop.run_in_executor(None, some_serious_job, flag, *arguments)
done, pending = await asyncio.wait([task], timeout=timeout_1)
if done:
return done.pop().result() # will raise an exception if some_serious_job failed
asyncio.ensure_future(
late_response(task, flag, timeout_2, finished_callback))
return None
def f(flag, n):
for i in range(n):
print("serious", i, flag)
if flag[0]:
return "CANCELLED"
time.sleep(1)
return "OK"
def finished(result):
print("FINISHED", result)
loop = asyncio.get_event_loop()
result = loop.run_until_complete(launch_job(loop, f, [1], finished))
print("result:", result)
loop.run_forever()
This will run the job in a separate thread (Use loop.set_executor(ProcessPoolExecutor())
to run a CPU intensive task in a process instead). Keep in mind it is a bad practice to terminate a process/thread - the code above uses a very simple list to signal the thread to stop (See also threading.Event
/ multiprocessing.Event
).
While implementing your solution, you might discover you would want to modify your existing code to use couroutines instead of using threads.
Upvotes: 1
Reputation: 59436
Fork off a thread doing the serious job, let it write its result into a queue and then terminate. Read in your main thread from that queue with a timeout of three seconds. If the timeout occurs, start another thread and return None. Let the second thread read from the queue with a timeout of one minute; if that timeouts also, call finished_callback(None); otherwise call finished_callback(result).
I sketched it like this:
import threading, queue
def should_return_in_3_sec(some_serious_job, arguments, finished_callback):
result_queue = queue.Queue(1)
def do_serious_job_and_deliver_result():
result = some_serious_job(arguments)
result_queue.put(result)
threading.Thread(target=do_serious_job_and_deliver_result).start()
try:
result = result_queue.get(timeout=3)
except queue.Empty: # timeout?
def expect_and_handle_late_result():
try:
result = result_queue.get(timeout=60)
except queue.Empty:
finished_callback(None)
else:
finished_callback(result)
threading.Thread(target=expect_and_handle_late_result).start()
return None
else:
return result
Upvotes: 3