Reputation: 12807
I have a singleton queue manager based on rq
that has (1) enqueue, (2) status & (3) clear API. I have a problem with trying to clear the rq db after a job was queued and started to run.
I'm trying to find the job id using rq.registry.StartedJobRegistry
and then cancel the job using rq.job.cancel_job
but an exception regarding my redis connection is raised. I don't get why my redis connection can't be resolved.
The error traceback:
Traceback (most recent call last):
cancel_job(job_id)
File "lib/python3.8/site-packages/rq/job.py", line 53, in cancel_job
Job.fetch(job_id, connection=connection).cancel()
File "lib/python3.8/site-packages/rq/job.py", line 298, in fetch
job = cls(id, connection=connection, serializer=serializer)
File "lib/python3.8/site-packages/rq/job.py", line 327, in __init__
self.connection = resolve_connection(connection)
File "lib/python3.8/site-packages/rq/connections.py", line 69, in resolve_connection
raise NoRedisConnectionException('Could not resolve a Redis connection')
rq.connections.NoRedisConnectionException: Could not resolve a Redis connectio
queue_manager.py
import atexit
import os
import redis
import rq
import signal
import subprocess
from rq.job import cancel_job
from rq.registry import StartedJobRegistry
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class QueueManager(metaclass=Singleton):
"""Simple rq-based queue manager. Handles worker & queue setup and deletion
Kwargs:
default_timeout (int): timeout after which the worker will cancel its job
"""
def __init__(self, default_timeout=86400):
self._conn = redis.Redis()
self._queues = {}
self._workers = {}
self._default_timeout = default_timeout
atexit.register(self.clear)
def _spawn_worker(self, name, queue):
pid = subprocess.Popen(['rqworker', '-n', name, queue]).pid
print(f'Worker {name} was spawned (pid: {pid})')
self._workers[name] = pid
def _add_queue(self, name):
self._queues[name] = rq.Queue(
connection=self._conn,
default_timeout=self._default_timeout,
name=name,
)
print(f'Queue {name} was created')
def enqueue(self, queue_name, func, func_arg_ls):
try:
self._queues[queue_name].enqueue(func, *func_arg_ls)
except KeyError:
self._add_queue(queue_name)
self._spawn_worker(queue_name, queue_name)
self._queues[queue_name].enqueue(func, *func_arg_ls)
def status(self):
return subprocess.run(['rqinfo'], stdout=subprocess.PIPE).stdout.decode()
def clear(self):
"""Clears all queues and workers"""
for q in self._queues:
for job_id in StartedJobRegistry(q, connection=self._conn).get_job_ids():
cancel_job(job_id)
self._conn.flushdb()
for worker in self._workers:
os.kill(self._workers[worker], signal.SIGTERM)
print(f'Worker {worker} (pid: {self._workers[worker]}) was terminated.')
x.py
import time
def f(n):
time.sleep(n)
How to reproduce:
Python 3.8.0 (default, Nov 6 2019, 15:26:17)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.18.1 -- An enhanced Interactive Python. Type '?' for help.
In [1]: from queue_manager import QueueManager
In [2]: from x import f
In [3]: q = QueueManager()
In [4]: q.enqueue('queue_name', f, (1000,))
Queue queue_name was created
Worker queue_name was spawned (pid: 25822)
In [5]: 20:13:50 Worker rq:worker:queue_name: started, version 1.5.2
20:13:50 *** Listening on queue_name...
20:13:50 Cleaning registries for queue: queue_name
20:13:50 queue_name: x.f(1000) (149e7d64-26fb-497a-8e3a-bc1dc08d571f)
In [5]:
In [5]: q.clear()
Upvotes: 0
Views: 1146
Reputation: 10227
You forgot to pass your connection to cancel_job
:
cancel_job(job_id, connection=self._conn)
Upvotes: 1