Reputation: 16032
I have a group of tasks each of which make a request to a standard oauth API endpoint, and are dependent on a bearer_token. The tasks will raise an exception during response processing if the bearer_token is expired. There is also a refresh_bearer_token
task, which handles updating the token once it expires.
Here is a pseudo-code for that:
from proj.celery import app
bearer_token = '1234'
class OauthError(Exception):
pass
@app.task
def api_request():
response = request(bearer_token, ...)
if response.bearer_token_expired:
raise OauthError('oauth')
@app.task
def refresh_bearer_token():
...
How can I schedule the refresh_bearer_token
task to execute whenever an OauthError
is raised?
The only solution I can find is using the link_error
kwarg like this:
@app.task
def error_callback(uuid):
exception_msg = AsyncResult(uuid).get(propagate=False, disable_sync_subtasks=False)
if exception_msg = 'oauth':
refresh_bearer_token.delay()
else:
raise
api_request.apply_async(link_error=error_callback.s())
But this seems sub-optimal for several reasons, most notably because it spawns a synchronous child task within another synchronous child task, which strongly discourged in the docs.
Is there a more pythonic way of exception catching in celery?
For example:
def catch(func_that_requires_oauth):
try:
func_that_requires_oauth.delay()
except OauthError:
refresh_bearer_token.delay() | func_that_requires_oauth.delay()
Upvotes: 2
Views: 2468
Reputation: 32244
Just throwing some ideas out there. You could create a base task that waits or retries if a lock has been acquired by the refresh_bearer_token task whenever it's called. When it fails it kicks off the refresh_bearer_token task and also retires itself.
A retry puts a copy of the running task to the back of the queue
Now you would have to implement some kind of locking, if the lock is already acquired the refresh_bearer_token just does nothing as another task should be updating it. You would need to add a TTL to this "lock" too to prevent some conditions when the refresh_bearer_token
task fails
@app.task
def refresh_bearer_token():
try:
with aquire_lock(timeout=0):
refresh_token()
except TimeoutError:
pass
class RequiresOauthTask(app.Task):
abstract = True
def __call__(self, *args, **kwargs):
if lock_is_present():
self.retry() # or wait?
return super().__call__(*args, **kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
if isinstance(exc, OauthError):
refresh_bearer_token.delay()
self.retry()
super().on_failure(exc, task_id, args, kwargs, einfo)
@app.task(base=RequiresOauthTask)
def my_task():
pass
Upvotes: 2