CIsForCookies
CIsForCookies

Reputation: 12807

Trying to cancel a running rq job raises NoRedisConnectionException

I have a singleton queue manager based on rq that has (1) enqueue, (2) status & (3) clear API. I have a problem with trying to clear the rq db after a job was queued and started to run.

I'm trying to find the job id using rq.registry.StartedJobRegistry and then cancel the job using rq.job.cancel_job but an exception regarding my redis connection is raised. I don't get why my redis connection can't be resolved.

The error traceback:

Traceback (most recent call last):
    cancel_job(job_id)
  File "lib/python3.8/site-packages/rq/job.py", line 53, in cancel_job
    Job.fetch(job_id, connection=connection).cancel()
  File "lib/python3.8/site-packages/rq/job.py", line 298, in fetch
    job = cls(id, connection=connection, serializer=serializer)
  File "lib/python3.8/site-packages/rq/job.py", line 327, in __init__
    self.connection = resolve_connection(connection)
  File "lib/python3.8/site-packages/rq/connections.py", line 69, in resolve_connection
    raise NoRedisConnectionException('Could not resolve a Redis connection')
rq.connections.NoRedisConnectionException: Could not resolve a Redis connectio

queue_manager.py


import atexit
import os
import redis
import rq
import signal
import subprocess

from rq.job import cancel_job
from rq.registry import StartedJobRegistry


class Singleton(type):
    _instances = {}
    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
        return cls._instances[cls]


class QueueManager(metaclass=Singleton):
    """Simple rq-based queue manager. Handles worker & queue setup and deletion
    
    Kwargs:
        default_timeout (int): timeout after which the worker will cancel its job
    """
    def __init__(self, default_timeout=86400):
        self._conn = redis.Redis()
        self._queues = {}
        self._workers = {}
        self._default_timeout = default_timeout
        atexit.register(self.clear)

    def _spawn_worker(self, name, queue):
        pid = subprocess.Popen(['rqworker', '-n', name, queue]).pid
        print(f'Worker {name} was spawned (pid: {pid})')
        self._workers[name] = pid

    def _add_queue(self, name):
        self._queues[name] = rq.Queue(
            connection=self._conn,
            default_timeout=self._default_timeout,
            name=name,
        )
        print(f'Queue {name} was created')

    def enqueue(self, queue_name, func, func_arg_ls):
        try:
            self._queues[queue_name].enqueue(func, *func_arg_ls)
        except KeyError:
            self._add_queue(queue_name)
            self._spawn_worker(queue_name, queue_name)
            self._queues[queue_name].enqueue(func, *func_arg_ls)

    def status(self):
        return subprocess.run(['rqinfo'], stdout=subprocess.PIPE).stdout.decode()

    def clear(self):
        """Clears all queues and workers"""
        for q in self._queues:
            for job_id in StartedJobRegistry(q, connection=self._conn).get_job_ids():
                cancel_job(job_id)

        self._conn.flushdb()

        for worker in self._workers:
            os.kill(self._workers[worker], signal.SIGTERM)
            print(f'Worker {worker} (pid: {self._workers[worker]}) was terminated.')

x.py


import time

def f(n):
    time.sleep(n)

How to reproduce:

Python 3.8.0 (default, Nov  6 2019, 15:26:17)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.18.1 -- An enhanced Interactive Python. Type '?' for help.
                                                                                 
In [1]: from queue_manager import QueueManager             
               
In [2]: from x import f                                      
       
In [3]: q = QueueManager()

In [4]: q.enqueue('queue_name', f, (1000,))                                              
Queue queue_name was created    
Worker queue_name was spawned (pid: 25822)
                                                                           
In [5]: 20:13:50 Worker rq:worker:queue_name: started, version 1.5.2
20:13:50 *** Listening on queue_name...
20:13:50 Cleaning registries for queue: queue_name
20:13:50 queue_name: x.f(1000) (149e7d64-26fb-497a-8e3a-bc1dc08d571f)                        
In [5]: 
                                                                          
In [5]: q.clear() 

Upvotes: 0

Views: 1146

Answers (1)

GProst
GProst

Reputation: 10227

You forgot to pass your connection to cancel_job:

cancel_job(job_id, connection=self._conn)

Upvotes: 1

Related Questions