Reputation: 10342
I have now a big number of documents to process and am using Python RQ to parallelize the task.
I would like a pipeline of work to be done as different operations is performed on each document. For example: A
-> B
-> C
means pass the document to function A
, after A
is done, proceed to B
and last C
.
However, Python RQ does not seem to support the pipeline stuff very nicely.
Here is a simple but somewhat dirty of doing this. In one word, each function along the pipeline call its next function in a nesting way.
For example, for a pipeline A
->B
->C
.
At the top level, some code is written like this:
q.enqueue(A, the_doc)
where q is the Queue
instance and in function A
there are code like:
q.enqueue(B, the_doc)
And in B
, there are something like this:
q.enqueue(C, the_doc)
Is there any other way more elegant than this? For example some code in ONE function:
q.enqueue(A, the_doc)
q.enqueue(B, the_doc, after = A)
q.enqueue(C, the_doc, after= B)
depends_on parameter is the closest one to my requirement, however, running something like:
A_job = q.enqueue(A, the_doc)
q.enqueue(B, depends_on=A_job )
won't work. As q.enqueue(B, depends_on=A_job )
is executed immediately after A_job = q.enqueue(A, the_doc)
is executed. By the time B is enqueued, the result from A might not be ready as it takes time to process.
PS:
If Python RQ is not really good at this, what else tool in Python can I use to achieve the same purpose:
Upvotes: 6
Views: 5217
Reputation: 4643
By the time B is enqueued, the result from A might not be ready as it takes time to process.
I'm not sure if this was actually true when you originally posted the question but in any case, this is not true now. In fact, the depends_on
feature is made exactly for the workflow you described.
It is true that these two functions are executed immediately in succession.
A_job = q.enqueue(A, the_doc)
B_job = q.enqueue(B, depends_on=A_job )
But the worker will not execute B
until A
is finished. Until A_job
is successfully executed, B.status == 'deferred'
. Once A.status == 'finished'
, then B
will start to run.
This means that B
and C
can access and operate on the result of their dependencies like this:
import time
from rq import Queue, get_current_job
from redis import StrictRedis
conn = StrictRedis()
q = Queue('high', connection=conn)
def A():
time.sleep(100)
return 'result A'
def B():
time.sleep(100)
current_job = get_current_job(conn)
a_job_id = current_job.dependencies[0].id
a_job_result = q.fetch_job(a_job_id).result
assert a_job_result == 'result A'
return a_job_result + ' result B'
def C():
time.sleep(100)
current_job = get_current_job(conn)
b_job_id = current_job.dependencies[0].id
b_job_result = q.fetch_job(b_job_id).result
assert b_job_result == 'result A result B'
return b_job_result + ' result C'
The worker will eventually print 'result A result B result C'
.
Also, if you have many jobs in the queue and B
might be waiting a while before being executed, you might want to significantly increase result_ttl
or make it indefinite with result_ttl=-1
. Otherwise, the result of A will be purged after however many seconds are set for result_ttl
in which case B
will no longer be able to access it and return the desired result.
Setting result_ttl=-1
has important memory implications, however. This means your the result of your jobs will never be automatically purged and memory will grow proportionately until you manually remove those results from redis.
Upvotes: 4
Reputation: 1
depends_on parameter is the closest one to my requirement, however, running something like:
A_job = q.enqueue(A, the_doc) q.enqueue(B, depends_on=A_job )
won't work. As q.enqueue(B, depends_on=A_job ) is executed immediately after A_job = q.enqueue(A, the_doc) is executed. By the time B is enqueued, the result from A might not be ready as it takes time to process.
For this case, q.enqueue(B, depends_on=A_job) will run once A_job finishes. If the result is not ready, q.enqueue(B, depends_on=A_job) will wait until it is ready.
It does not support it out of the box, but using other technologies it is possible.
In my case, I used caching to keep track of the previous job in the chain, so when we want to enqueue a new function(to run right after) we can properly set its 'depends_on' parameter when calling enqueue()
Note the use of the additional parameters to enqueue: 'timeout, result_ttl, ttl'. These were used since I was running long jobs on rq. You can reference their use in the python rq docs.
I used django_rq.enqueue() which is derived from python rq
# main.py
def process_job():
...
# Create a cache key for every chain of methods you want to call.
# NOTE: I used this for web development, in your case you may want
# to use a variable or a database, not caching
# Number of time to cache and keep the results in rq
TWO_HRS = 60 * 60 * 2
cache_key = 'update-data-key-%s' % obj.id
previous_job_id = cache.get(cache_key)
job = django_rq.enqueue(update_metadata,
campaign=campaign,
list=chosen_list,
depends_on=previous_job_id,
timeout=TWO_HRS,
result_ttl=TWO_HRS,
ttl=TWO_HRS)
# Set the value for the most recent finished job, so the next function
# in the chain can set the proper value for 'depends_on'
cache.set(token_key, job.id, TWO_HRS)
# utils.py
def update_metadata(campaign, list):
# Your code goes here to update the campaign object with the list object
pass
'depends_on' - From the the rq docs:
depends_on - specifies another job (or job id) that must complete before this job will be queued
Upvotes: 0