Reputation: 32959
Recently, I have been going though celery & kombu documentation as i need them integrated in one of my projects. I have a basic understanding of how this should work but documentation examples using different brokers have me confused.
Here is the scenario:
Within my application i have two views ViewA
and ViewB
both of them does some expensive processing, so i wanted to have them use celery tasks for processing. So this is what i did.
def ViewA(request):
tasks.do_task_a.apply_async(args=[a, b])
def ViewB(request):
tasks.do_task_b.apply_async(args=[a, b])
@task()
def do_task_a(a, b):
# Do something Expensive
@task()
def do_task_b(a, b):
# Do something Expensive here too
Until now, everything is working fine. The problem is that do_task_a
creates a txt
file on the system, which i need to use in do_task_b
. Now, in the do_task_b
method i can check for the file existence and call the tasks retry
method [which is what i am doing right now] if the file does not exist.
Here, I would rather want to take a different approach (i.e. where messaging comes in). I would want do_task_a
to send a message to do_task_b
once the file has been created instead of looping the retry method until the file is created.
I read through the documentation of celery
and kombu
and updated my settings as follows.
BROKER_URL = "django://"
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = "sqlite:///celery"
TASK_RETRY_DELAY = 30 #Define Time in Seconds
DATABASE_ROUTERS = ['portal.db_routers.CeleryRouter']
CELERY_QUEUES = (
Queue('filecreation', exchange=exchanges.genex, routing_key='file.create'),
)
CELERY_ROUTES = ('celeryconf.routers.CeleryTaskRouter',)
and i am stuck here. don't know where to go from here.
What should i do next to make do_task_a
to broadcast a message to do_task_b
on file creation ? and what should i do to make do_task_b
receive (consume) the message and process the code further ??
Any Ideas and suggestions are welcome.
Upvotes: 0
Views: 118
Reputation: 1620
This is a good example for using Celery's callback/linking function.
Celery supports linking tasks together so that one task follows another. You can read more about it here
apply_async()
functions has two optional arguments
+link : excute the linked function on success
+link_error : excute the linked function on an error
@task
def add(a, b):
return a + b
@task
def total(numbers):
return sum(numbers)
@task
def error_handler(uuid):
result = AsyncResult(uuid)
exc = result.get(propagate=False)
print('Task %r raised exception: %r\n%r' % (exc, result.traceback))
Now in your calling function do something like
def main():
#for error_handling
add.apply_async((2, 2), link_error=error_handler.subtask())
#for linking 2 tasks
add.apply_async((2, 2), link=add.subtask((8, )))
# output 12
#what you can do is your case is something like this.
if user_requires:
add.apply_async((2, 2), link=add.subtask((8, )))
else:
add.apply_async((2, 2))
Hope this is helpful
Upvotes: 1