Reputation: 730
I need to wrap a coroutine that returns data. If the data is returned, then it is not available anymore. If the coroutine is cancelled, the data is available next call. I need the wrapping coroutine to have the same behavior, however sometimes it is cancelled while the wrapped coroutine has already finished.
I can reproduce this behavior with the following code.
import asyncio
loop = asyncio.get_event_loop()
fut = asyncio.Future()
async def wait():
return await fut
task = asyncio.ensure_future(wait())
async def test():
await asyncio.sleep(0.1)
fut.set_result('data')
print ('fut', fut)
print ('task', task)
task.cancel()
await asyncio.sleep(0.1)
print ('fut', fut)
print ('task', task)
loop.run_until_complete(test())
The output clearly shows that the wrapping coroutine was cancelled after the coroutine finished, meaning that data is forever lost. I cannot shield neither the call, because if I'm cancelled I have no data to return anyway.
fut <Future finished result='data'>
task <Task pending coro=<wait() running at <ipython-input-8-6d115ded09c6>:7> wait_for=<Future finished result='data'>>
fut <Future finished result='data'>
task <Task cancelled coro=<wait() done, defined at <ipython-input-8-6d115ded09c6>:6>>
In my case, this is due to having two futures, the one validating the wrapped coroutine, and the one cancelling the wrapping coroutine, being sometimes validated together. I could probably choose to delay the cancel (via asyncio.sleep(0)
), but am I sure it will never happen by accident ?
The problem makes more sense with a task:
import asyncio
loop = asyncio.get_event_loop()
data = []
fut_data = asyncio.Future()
async def get_data():
while not data:
await asyncio.shield(fut_data)
return data.pop()
fut_wapper = asyncio.Future()
async def wrapper_data():
task = asyncio.ensure_future(get_data())
return await task
async def test():
task = asyncio.ensure_future(wrapper_data())
await asyncio.sleep(0)
data.append('data')
fut_data.set_result(None)
await asyncio.sleep(0)
print ('wrapper_data', task)
task.cancel()
await asyncio.sleep(0)
print ('wrapper_data', task)
print ('data', data)
loop.run_until_complete(test())
task <Task cancelled coro=<wrapper_data() done, defined at <ipython-input-2-93645b78e9f7>:16>>
data []
The data has been consumed but the task has been cancelled, so data cannot be retrieved. Awaiting directly for get_data()
would work, but then cannot be cancelled.
Upvotes: 0
Views: 162
Reputation: 155046
I think you need to first shield the awaited future from cancellation, then detect your own cancellation. If the future hasn't completed, propagate the cancellation into it (effectively undoing the shield()
) and out. If the future has completed, ignore the cancellation and return the data.
The code would look like this, also changed to avoid global vars and use asyncio.run()
(which you can turn to run_until_complete()
if you're using Python 3.6):
import asyncio
async def wait(fut):
try:
return await asyncio.shield(fut)
except asyncio.CancelledError:
if fut.done():
# we've been canceled, but we have the data - ignore the
# cancel request
return fut.result()
# otherwise, propagate the cancellation into the future
fut.cancel()
# ...and to the caller
raise
async def test():
loop = asyncio.get_event_loop()
fut = loop.create_future()
task = asyncio.create_task(wait(fut))
await asyncio.sleep(0.1)
fut.set_result('data')
print ('fut', fut)
print ('task', task)
task.cancel()
await asyncio.sleep(0.1)
print ('fut', fut)
print ('task', task)
asyncio.run(test())
Note that ignoring the cancel request can be thought of as abuse of the cancellation mechanism. But if the task is known to proceed afterwards (ideally immediately finishing), it might be the right thing in your situation. Caution is advised.
Upvotes: 1