Reputation: 11
I am using python 3.6.6 and Celery 4.2.0.
I am trying to manage dynamic task workflows which could change on the fly. Workflows may contain long and short duration steps.
For example: Initially I have the following task workflow:
But, at some point I have to add another task which depends on A. So task may wait until A finish:
from __future__ import absolute_import from celery import subtask, signals from pymemcache.client import base from test_celery.celery import app import time def get_task_uuid(task): return str(hash(frozenset(task[0], task[1])))) @app.task def add(x, y): print('add({},{}) = {} | {}'.format(x, y, x+y, time.time())) return x+y @app.task def sub(x, y): print('sub({},{}) = {} | {}'.format(x, y, x-y, time.time())) return x-y @app.task def mul(x, y): time.sleep(10) print('mul({},{}) = {} | {}'.format(x,y,x*y, time.time())) return x*y @signals.before_task_publish.connect def before_task_publish(body, exchange, routing_key, headers, properties, retry_policy, **kw): task = (body, headers['task']) uuid = get_task_uuid(task)
I've been looking for any possible approach, trying to listen task signals to make D run as soon as task A succeeds (signals.task_success). Any idea?
Upvotes: 1
Views: 357