cary
cary

Reputation: 31

How to retry an Redis Queue job on a different queue?

I am using Python with RQ to manage jobs. Occasionally a job will fail for a variety of reasons, usually either insufficient memory or timeout. I would like to have these jobs retried but with more resources dedicated to the ECS task. I have multiple job queues and different workers for each job queue with resource assignments tailored for each queue type (e.g. small, medium and large). For these failed jobs, I would like to move them to the next larger queue (e.g. a failed job on the medium queue should be retried but moved to the large queue). This is my worker code, I've tried a few different ways but nothing seems to actually work.


listen = [os.getenv('REDIS_QUEUE', 'small')]
retry_queue = os.getenv('RETRY_QUEUE', 'medium')

rqn_small = 'small'
rqn_medium = 'medium'
rqn_large = 'large'

rq_small = Queue(rqn_small, connection=redis_connection)
rq_medium = Queue(rqn_medium, connection=redis_connection)
rq_large = Queue(rqn_large, connection=redis_connection)

def retry_job(job : Job):
    if retry_queue == rqn_small:
        #job.retry(queue=rqn_small)
        job.retry(queue=rq_small)
    elif retry_queue == rqn_medium:
        #job.retry(rqn_medium)
        job.retry(rq_medium)
    elif retry_queue == rqn_large:
        #job.retry(queue=rqn_large)
        job.retry(queue=rq_large)
    else:
        job.retry() # stay on current queue

def retry_handler(job, exc_type, exception, traceback):
    if job.retries_left:
        retry_job(job)
        return False # No more handlers
    return True # Allow other handlers
    
def work_horse_killed_handler(job: Job, retpid: int, ret_val: int, rusage: struct_rusage):
    # THis is called when ECS terminates a task due to scaling policies and the job was not able to finish
    retry_job(job)
    return False

if __name__ == '__main__':
    with Connection(redis_connection):
        worker = Worker(map(Queue, listen), 
                        exception_handlers=[retry_handler], 
                        work_horse_killed_handler=work_horse_killed_handler)
        worker.work(with_scheduler=True)
    print('Worker has exited') 

I thought about scheduling a new job and letting this one fail but I would like to keep the same job ID as I return that to the client when the job is submitted.

Upvotes: 0

Views: 635

Answers (1)

cary
cary

Reputation: 31

I believe the only way to tackle this is to create a new job on the desired queue. I linked the two jobs together using the job metadata. This way the initial job, which ends up in the canceled job registry, will be able to be followed to the replacement job to get the status. This seems to work fine. I did find through my work that an exception thrown in the workhorse_killed_handler is very hard to find so be very careful with that code.

Upvotes: 0

Related Questions