Reputation: 93
I am new to Celery . I am trying to make a distributed task work with celery.
Lets say I have a single task in my tasks file task.py
:
@celery.task
def generate_sorts(params,meta_data_dict):
'''
This will generate some sorts .
'''
pass
And I am doing the following as part of some distributed processing:
taskset = TaskSet(
tasks.generate_sorts.subtask(args = (params, meta_data_dict))
for meta_data_dict in chunk_generator)
print "Dispatching tasks"
taskset_result = taskset.apply_async()
print "Waiting for results"
results = taskset_result.join_native()
print "Results:"
#Process the results.
Now chunk_generator is basically a generator pattern which goes to the database and fetches some metadata. My problem now is these tasks get accumulated before eventually sent to the task queue. My generator takes somewhere around 30 minutes to fetch all the meta data before the tasks are actually added to queue. I know that is how TaskSet
is intend to perform. I am looking for some alternate to TaskSet
, i.e. will I be able to perform the equivalent of the below in distributed manner.
pool.imap_unordered(generate_sorts, chunk_generator)
The above will perform generate_sots as soon as a generator yields a result. In other words is there anything alternate to TaskSet
where I can add from generator as soon as the generator yield the first job, instead of waiting for generator to fetch everything before I could eventually start doing some work.
Upvotes: 0
Views: 2996
Reputation: 7707
You should try starting them immediately and adding the resulting instances of AsyncResult
to a ResultSet
:
from celery.result import ResultSet
result_set = ResultSet()
for meta_data_dict in chunk_generator:
# Add the task to the queue immediately
result = task.generate_sorts.delay(params, meta_data_dict)
result_set.add(result)
print "Waiting for results"
results = result_set.join_native()
print "Results:"
# Process the results
Upvotes: 1