Celery define workflows on the fly

I am using python 3.6.6 and Celery 4.2.0.

I am trying to manage dynamic task workflows which could change on the fly. Workflows may contain long and short duration steps.

For example: Initially I have the following task workflow:

Initial workflow

But, at some point I have to add another task which depends on A. So task may wait until A finish:

Desired workflow

   from __future__ import absolute_import
   from celery import subtask, signals
   from pymemcache.client import base
   from test_celery.celery import app
   import time

   def get_task_uuid(task):
      return str(hash(frozenset(task[0], task[1]))))

   @app.task
   def add(x, y):
       print('add({},{}) = {} | {}'.format(x, y, x+y, time.time()))

       return x+y

    @app.task
    def sub(x, y):
       print('sub({},{}) = {} | {}'.format(x, y, x-y, time.time()))
       return x-y

    @app.task
    def mul(x, y):
       time.sleep(10)
       print('mul({},{}) = {} | {}'.format(x,y,x*y, time.time()))
       return x*y

     @signals.before_task_publish.connect
     def before_task_publish(body, exchange, routing_key, headers, properties,  retry_policy, **kw):
         task = (body, headers['task'])
         uuid = get_task_uuid(task)

I've been looking for any possible approach, trying to listen task signals to make D run as soon as task A succeeds (signals.task_success). Any idea?

Upvotes: 1

Views: 357

Answers (1)

safhac
safhac

Reputation: 1149

you could use chains and link the result of the 1st task to call whatever task you want according to the result

Upvotes: 0

Related Questions