jdotjdot
jdotjdot

Reputation: 17042

Dealing with exception handling and re-queueing in RQ on Heroku

I have a website running on Heroku in Python, and I have a worker up as a background process to handle tasks that I don't want to block webpage delivery and therefore are inappropriate for the web dynos. For this, I've set up a queue using rq and redis.

In my process, occasionally, custom exceptions might arise. For a specific subset of these, rather than allow the job to go straight to the 'failed' queue, I want to requeue it a few times. I've been looking at the exception handlers page on the rq homepage, and I'm unclear on a few things. In particular, it describes the following way to write an exception handler:

def my_handler(job, exc_type, exc_value, traceback):
    # do custom things here
    # for example, write the exception info to a DB
    ...

Right now, I'm planning to do something along the lines of:

   from rq import requeue_job
   def my_handler(job, exc_type, exc_value, traceback):
        if exec_type == "MyCustomError":
           job.meta['MyErrorCount'] += 1
           job.save()

           if job.meta['MyErrorCount'] >= 10:
               return True
           else:
               requeue_job(job.id)
               return False

Questions:

Upvotes: 8

Views: 6381

Answers (3)

Vishal Vasnani
Vishal Vasnani

Reputation: 531

The answer shared by Jökull here is on the correct lines just that the answer (and the post) is quite old. I had a similar situation where I needed to put retry logic in my redis-queue whenever a job fails. Here is a snippet for queue workers (had to move the logic to workers) taking inspiration from Jökull's answer (and comments), redis-queue docs and this:

from rq import Worker, Connection, Queue
from redis import Redis
from app.config import Config

conn = Redis()
max_retries = 3

def retry_handler(job, exc_type, exc_value, traceback):
    # Returning True moves the job to the failed queue (or continue to
    # the next handler)
    job.meta.setdefault('failures', 1)
    job.meta['failures'] += 1
    if job.meta['failures'] > max_retries:
        job.save()
        return True
    # I was unable to locate Status in rq so added the status 'queued' that rq actually expects
    job.status = 'queued' 
    for queue_ in Queue.all(connection=conn):
        if queue_.name == job.origin:
            # at_front=True enqueues the job at the front of the queue for immediate retry.
            queue_.enqueue_job(job, at_front=True) 
            break
    else:
        return True  # Queue has disappeared, fail job

    return False  # Job is handled. Stop the handler chain.

if __name__ == "__main__":
    with Connection(conn):
            worker = Worker(list(map(Queue, ['your_queue_name'])), exception_handlers=[retry_handler])
            worker.work()

Upvotes: 0

Jökull
Jökull

Reputation: 503

Here’s my solution

queues = []

def retry_handler(job, exc_type, exc_value, traceback):
    # Returning True moves the job to the failed queue (or continue to
    # the next handler)

    job.meta.setdefault('failures', 1)
    job.meta['failures'] += 1
    if job.meta['failures'] > 3 or isinstance(exc_type, (LookupError, CorruptImageError)):
        job.save()
        return True

    job.status = Status.QUEUED
    for queue_ in queues:
        if queue_.name == job.origin:
            queue_.enqueue_job(job, timeout=job.timeout)
            break
    else:
        return True  # Queue has disappeared, fail job

    return False  # Job is handled. Stop the handler chain.

queues.append(Queue(exc_handler=retry_handler))

I decided to retry all errors three times unless a certain known exception type was encountered. This allows me to respect failures that are understood, like if a user was deleted after the job was created but before the job was executed, or in the case of an image resize job the image provided is no longer found (HTTP 404) or not in a readable format (basically whenever I know the code will never handle the job).

To answer your question: exc_type is the class, exc_value is the exception instance. traceback is useful for logging. If you care about this, check out Sentry. Workers are automatically configured with a Sentry error handler if run with SENTRY_DSN in the context. Much cleaner than polluting your own db with error logs.

Upvotes: 6

iMom0
iMom0

Reputation: 12911

  1. for more info, read the doc of sys.
  2. False means stop processing exceptions, True means continue and fall through to the next exception handler on the stack

multiple exception handlers of the same job, type is exception type (a class), you should correct your code, other errors will return None interpreted as True as the rq doc says.

Upvotes: -2

Related Questions