Reputation: 206
I have tasks in a chain with three tasks fetch_page, check_source and store page
def update_page_info(**headers):
chain=fetch_page.s(headers['key']) | check_source.s(headers['key_1']) | store_info.s()
chain().apply_async()
fetch_page fetches the page and gathers what it needs to gather:
@app.task(bind=True)
def fetch_page(self,url):
#fetch_page here and return a tuple so that it can be unpacked
# dosomething
After fetching the page now it checks for source in the next tasks check_source.
@app.task(bind=True)
def check_source(self,page_and_url,handle):
try:
#unpack your stuffs here
page,url=page_and_url
get_result={}
if handle=='first_option':
get_result=select_first_option(one,two)
return get_result
elif handle=='second_option':
get_result=select_second_option(one,two)
return (get_result)
elif handle=='third_option':
get_result=select_third_option(one,two)
return (get_result)
else:
return "IGNORE FOR NOW"
except Exception as exc:
pass
so the confusion is can i call some other tasks from here?? will there be any inconsistency or will the worker will ever get in deadlock doing this?
And finally it should execute store_info() which would just store things returned from check_source()
@app.task(bind=True)
def store_info(self,result):
print ("store_info ")
try:
#store the fetched pages
except Exception as exc:
#dosomething
finally:
pass
I was following this approach which just needed little modification http://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks.
Can anybody suggest me how it should be done and things i need to be more careful about?
Upvotes: 1
Views: 961
Reputation: 1382
This all should work like you are reading (and communicating) it should. The three tasks will execute in order without any "inconsistency."
If you call update_page_info
one time the three chained sub-tasks will be run exclusively of each other. That said, there is still the potential for deadlocks with this setup. If you called update_page_info
while previous tasks from the last time you called it were running then you could get more than one task running at once. This would introduce the potential for deadlocks depending how your tasks share resources.
If your tasks share resources I would suggest using something like redis or memcached as a locking system across workers.
Edit: the code I see now is totally fine as the results are passed along as parameters to the next task.
Upvotes: 1