Amyth
Amyth

Reputation: 32959

using topic exchange to send message from one method to another

Recently, I have been going though celery & kombu documentation as i need them integrated in one of my projects. I have a basic understanding of how this should work but documentation examples using different brokers have me confused.

Here is the scenario:

Within my application i have two views ViewA and ViewB both of them does some expensive processing, so i wanted to have them use celery tasks for processing. So this is what i did.

views.py

def ViewA(request):
    tasks.do_task_a.apply_async(args=[a, b])


def ViewB(request):
    tasks.do_task_b.apply_async(args=[a, b])

tasks.py

@task()
def do_task_a(a, b):
    # Do something Expensive

@task()
def do_task_b(a, b):
    # Do something Expensive here too

Until now, everything is working fine. The problem is that do_task_a creates a txt file on the system, which i need to use in do_task_b. Now, in the do_task_b method i can check for the file existence and call the tasks retry method [which is what i am doing right now] if the file does not exist.

Here, I would rather want to take a different approach (i.e. where messaging comes in). I would want do_task_a to send a message to do_task_b once the file has been created instead of looping the retry method until the file is created.

I read through the documentation of celery and kombu and updated my settings as follows.

BROKER_URL = "django://"
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = "sqlite:///celery"
TASK_RETRY_DELAY = 30 #Define Time in Seconds
DATABASE_ROUTERS = ['portal.db_routers.CeleryRouter']
CELERY_QUEUES = ( 
    Queue('filecreation', exchange=exchanges.genex, routing_key='file.create'),
)
CELERY_ROUTES = ('celeryconf.routers.CeleryTaskRouter',)

and i am stuck here. don't know where to go from here.

What should i do next to make do_task_a to broadcast a message to do_task_b on file creation ? and what should i do to make do_task_b receive (consume) the message and process the code further ??

Any Ideas and suggestions are welcome.

Upvotes: 0

Views: 118

Answers (1)

Crazyshezy
Crazyshezy

Reputation: 1620

This is a good example for using Celery's callback/linking function.

Celery supports linking tasks together so that one task follows another. You can read more about it here

apply_async() functions has two optional arguments

+link : excute the linked function on success 
+link_error : excute the linked function on an error

@task
def add(a, b):
    return a + b

@task
def total(numbers):
    return sum(numbers)

@task
def error_handler(uuid):
    result = AsyncResult(uuid)
    exc = result.get(propagate=False)
    print('Task %r raised exception: %r\n%r' % (exc, result.traceback))

Now in your calling function do something like

def main():
    #for error_handling
    add.apply_async((2, 2), link_error=error_handler.subtask())

    #for linking 2 tasks
    add.apply_async((2, 2), link=add.subtask((8, )))
    # output 12

    #what you can do is your case is something like this. 
    if user_requires:
        add.apply_async((2, 2), link=add.subtask((8, )))
    else:
        add.apply_async((2, 2))

Hope this is helpful

Upvotes: 1

Related Questions