Reputation: 632
I used to have a function like this
def calculate(self, input):
result = input * 2
if result > 4:
result_higher_then_four.send(result)
return result
Where result_higher_then_four
obviously represents a signal.
Then I introduced celery and my function looked like below and I never received a signal again. I suppose signals are bound per process and as celery runs in a different process, this means I cannot catch the signal in the main process. Should I use a thread_local
to fix this? Or am I overlooking the obvious?
Thanks
@task
def calculate(self, input):
result = input * 2
if result > 4:
result_higher_then_four.send(result)
return result
Upvotes: 12
Views: 9240
Reputation: 4760
You can use the celeryd_init signal to initialize your workers and signals http://celery.readthedocs.org/en/latest/userguide/signals.html#celeryd-init
Based on what you provided, I've tested with:
from celery.signals import celeryd_init
from celery.utils.dispatch import Signal
def process_result(result, *args, **kwargs):
print "signals received: %s" % result
result_higher_then_four = Signal()
@celeryd_init.connect
def init_signals(*args, **kwargs):
result_higher_then_four.connect(process_result)
@task(bind=True)
def calculate(self, input):
result = input * 2
if result > 4:
result_higher_then_four.send(result=result, sender=self)
return result
Upvotes: 4
Reputation: 2519
The problem is that the signal receiver isn't getting registered. The celery workers run in their own process so the signal connections need to be made in that process. If you know what they are or can discover them, you can register them during task initialization using this technique.
Of course, that eliminates some of the benefit of using signals in the first place because you need to know the connections in advance.
One idea is to assume that the signal receivers will always register in the models module of each app. In which case the following will work.
class CalculateTask(celery.Task):
def __init__(self):
from django.conf import settings
for app in settings.INSTALLED_APPS:
app_models = '{}.{}'.format(app,'models')
__import__(app_models, globals=globals())
def run(self, input):
result = input * 2
if result > 4:
result_higher_then_four.send(result)
return result
Upvotes: 4