Reputation: 1092
I'm working with a django application hosted on heroku with redistogo addon:nano pack. I'm using rq, to execute tasks in the background - the tasks are initiated by online users. I've a constraint on increasing number of connections, limited resources I'm afraid.
I'm currently having a single worker running over 'n' number of queues. Each queue uses an instance of connection from the connection pool to handle 'n' different types of task. For instance, lets say if 4 users initiate same type of task, I would like to have my main worker create child processes dynamically, to handle it. Is there a way to achieve required multiprocessing and concurrency?
I tried with multiprocessing
module, initially without introducing Lock()
; but that exposes and overwrites user passed data to the initiating function, with the previous request data. After applying locks, it restricts second user to initiate the requests by returning a server error - 500
github link #1: Looks like the team is working on the PR; not yet released though!
github link #2: This post helps to explain creating more workers at runtime. This solution however also overrides the data. The new request is again processed with the previous requests data.
Let me know if you need to see some code. I'll try to post a minimal reproducible snippet.
Any thoughts/suggestions/guidelines?
Upvotes: 0
Views: 1958
Reputation:
Did you get a chance to try AutoWorker?
Spawn RQ Workers automatically.
from autoworker import AutoWorker
aw = AutoWorker(queue='high', max_procs=6)
aw.work()
It makes use of multiprocessing
with StrictRedis
from redis
module and following imports from rq
from rq.contrib.legacy import cleanup_ghosts
from rq.queue import Queue
from rq.worker import Worker, WorkerStatus
Upvotes: 2
Reputation: 1092
After looking under the hood, I realised Worker
class is already implementing multiprocessing.
The work
function internally calls execute_job(job, queue)
which in turn as quoted in the module
Spawns a work horse to perform the actual work and passes it a job.
The worker will wait for the work horse and make sure it executes within the given timeout bounds,
or will end the work horse with SIGALRM.
The execute_job()
funtion makes a call to fork_work_horse(job, queue)
implicitly which spawns a work horse to perform the actual work and passes it a job as per the following logic:
def fork_work_horse(self, job, queue):
child_pid = os.fork()
os.environ['RQ_WORKER_ID'] = self.name
os.environ['RQ_JOB_ID'] = job.id
if child_pid == 0:
self.main_work_horse(job, queue)
else:
self._horse_pid = child_pid
self.procline('Forked {0} at {1}'.format(child_pid, time.time()))
The main_work_horse
makes an internal call to perform_job(job, queue)
which makes a few other calls to actually perform the job.
All the steps about The Worker Lifecycle mentioned over rq's official documentation page are taken care within these calls.
It's not the multiprocessing I was expecting, but I guess they have a way of doing things. However my original post is still not answered with this, also I'm still not sure about concurrency..
The documentation there still needs to be worked upon, since it hardly covers the true essence of this library!
Upvotes: 0