ashim888
ashim888

Reputation: 206

Celery calling different function and continue the chaining process

I have tasks in a chain with three tasks fetch_page, check_source and store page

def update_page_info(**headers):
    chain=fetch_page.s(headers['key']) | check_source.s(headers['key_1']) | store_info.s()
    chain().apply_async()

fetch_page fetches the page and gathers what it needs to gather:

@app.task(bind=True)
def fetch_page(self,url):
    #fetch_page here and return a tuple so that it can be unpacked
    # dosomething

After fetching the page now it checks for source in the next tasks check_source.

@app.task(bind=True)
def check_source(self,page_and_url,handle):
    try:
        #unpack your stuffs here
        page,url=page_and_url
        get_result={}

        if handle=='first_option':
            get_result=select_first_option(one,two)
            return get_result

        elif handle=='second_option':
            get_result=select_second_option(one,two)
            return (get_result)

        elif handle=='third_option':
            get_result=select_third_option(one,two)
            return (get_result)
        else:
            return "IGNORE FOR NOW"
    except Exception as exc:
        pass

so the confusion is can i call some other tasks from here?? will there be any inconsistency or will the worker will ever get in deadlock doing this?

And finally it should execute store_info() which would just store things returned from check_source()

@app.task(bind=True)
def store_info(self,result):
    print ("store_info ")
    try:
        #store the fetched pages

    except Exception as exc:
        #dosomething
    finally:
        pass

I was following this approach which just needed little modification http://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks.

Can anybody suggest me how it should be done and things i need to be more careful about?

Upvotes: 1

Views: 961

Answers (1)

deeb
deeb

Reputation: 1382

This all should work like you are reading (and communicating) it should. The three tasks will execute in order without any "inconsistency."

If you call update_page_info one time the three chained sub-tasks will be run exclusively of each other. That said, there is still the potential for deadlocks with this setup. If you called update_page_info while previous tasks from the last time you called it were running then you could get more than one task running at once. This would introduce the potential for deadlocks depending how your tasks share resources.

If your tasks share resources I would suggest using something like redis or memcached as a locking system across workers.

Edit: the code I see now is totally fine as the results are passed along as parameters to the next task.

Upvotes: 1

Related Questions