Lord Elrond
Lord Elrond

Reputation: 16032

Exception handling in Celery?

I have a group of tasks each of which make a request to a standard oauth API endpoint, and are dependent on a bearer_token. The tasks will raise an exception during response processing if the bearer_token is expired. There is also a refresh_bearer_token task, which handles updating the token once it expires.

Here is a pseudo-code for that:

from proj.celery import app

bearer_token = '1234'

class OauthError(Exception):
    pass

@app.task
def api_request():
    response = request(bearer_token, ...)
    if response.bearer_token_expired:
        raise OauthError('oauth')

@app.task
def refresh_bearer_token():
    ...

How can I schedule the refresh_bearer_token task to execute whenever an OauthError is raised?

The only solution I can find is using the link_error kwarg like this:

@app.task
def error_callback(uuid):
    exception_msg = AsyncResult(uuid).get(propagate=False, disable_sync_subtasks=False)
    if exception_msg = 'oauth':
        refresh_bearer_token.delay()
    else:
        raise 

api_request.apply_async(link_error=error_callback.s())

But this seems sub-optimal for several reasons, most notably because it spawns a synchronous child task within another synchronous child task, which strongly discourged in the docs.

Is there a more pythonic way of exception catching in celery?

For example:

def catch(func_that_requires_oauth):
    try:
        func_that_requires_oauth.delay()
    except OauthError:
        refresh_bearer_token.delay() | func_that_requires_oauth.delay()

Upvotes: 2

Views: 2468

Answers (1)

Iain Shelvington
Iain Shelvington

Reputation: 32244

Just throwing some ideas out there. You could create a base task that waits or retries if a lock has been acquired by the refresh_bearer_token task whenever it's called. When it fails it kicks off the refresh_bearer_token task and also retires itself.

A retry puts a copy of the running task to the back of the queue

Now you would have to implement some kind of locking, if the lock is already acquired the refresh_bearer_token just does nothing as another task should be updating it. You would need to add a TTL to this "lock" too to prevent some conditions when the refresh_bearer_token task fails

@app.task
def refresh_bearer_token():
    try:
        with aquire_lock(timeout=0):
            refresh_token()
    except TimeoutError:
        pass


class RequiresOauthTask(app.Task):
    abstract = True

    def __call__(self, *args, **kwargs):
        if lock_is_present():
            self.retry()  # or wait?
        return super().__call__(*args, **kwargs)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        if isinstance(exc, OauthError):
            refresh_bearer_token.delay()
            self.retry()
        super().on_failure(exc, task_id, args, kwargs, einfo)


@app.task(base=RequiresOauthTask)
def my_task():
    pass

Upvotes: 2

Related Questions