Ronnie
Ronnie

Reputation: 1092

Concurrency within redis queue

I'm working with a django application hosted on heroku with redistogo addon:nano pack. I'm using rq, to execute tasks in the background - the tasks are initiated by online users. I've a constraint on increasing number of connections, limited resources I'm afraid.

I'm currently having a single worker running over 'n' number of queues. Each queue uses an instance of connection from the connection pool to handle 'n' different types of task. For instance, lets say if 4 users initiate same type of task, I would like to have my main worker create child processes dynamically, to handle it. Is there a way to achieve required multiprocessing and concurrency?

I tried with multiprocessing module, initially without introducing Lock(); but that exposes and overwrites user passed data to the initiating function, with the previous request data. After applying locks, it restricts second user to initiate the requests by returning a server error - 500

github link #1: Looks like the team is working on the PR; not yet released though!

github link #2: This post helps to explain creating more workers at runtime. This solution however also overrides the data. The new request is again processed with the previous requests data.

Let me know if you need to see some code. I'll try to post a minimal reproducible snippet.

Any thoughts/suggestions/guidelines?

Upvotes: 0

Views: 1958

Answers (2)

user11470096
user11470096

Reputation:

Did you get a chance to try AutoWorker?

Spawn RQ Workers automatically.

from autoworker import AutoWorker
aw = AutoWorker(queue='high', max_procs=6)
aw.work()

It makes use of multiprocessing with StrictRedis from redis module and following imports from rq

from rq.contrib.legacy import cleanup_ghosts
from rq.queue import Queue
from rq.worker import Worker, WorkerStatus

Upvotes: 2

Ronnie
Ronnie

Reputation: 1092

After looking under the hood, I realised Worker class is already implementing multiprocessing.

The work function internally calls execute_job(job, queue) which in turn as quoted in the module

Spawns a work horse to perform the actual work and passes it a job.

The worker will wait for the work horse and make sure it executes within the given timeout bounds,

or will end the work horse with SIGALRM.

The execute_job() funtion makes a call to fork_work_horse(job, queue) implicitly which spawns a work horse to perform the actual work and passes it a job as per the following logic:


def fork_work_horse(self, job, queue):

        child_pid = os.fork()
        os.environ['RQ_WORKER_ID'] = self.name
        os.environ['RQ_JOB_ID'] = job.id
        if child_pid == 0:
            self.main_work_horse(job, queue)
        else:
            self._horse_pid = child_pid
            self.procline('Forked {0} at {1}'.format(child_pid, time.time()))


The main_work_horse makes an internal call to perform_job(job, queue) which makes a few other calls to actually perform the job.

All the steps about The Worker Lifecycle mentioned over rq's official documentation page are taken care within these calls.

It's not the multiprocessing I was expecting, but I guess they have a way of doing things. However my original post is still not answered with this, also I'm still not sure about concurrency..

The documentation there still needs to be worked upon, since it hardly covers the true essence of this library!

Upvotes: 0

Related Questions