user2298943
user2298943

Reputation: 632

Celery and signals

I used to have a function like this

def calculate(self, input):
    result = input * 2

    if result > 4:
        result_higher_then_four.send(result)

    return result

Where result_higher_then_four obviously represents a signal.

Then I introduced celery and my function looked like below and I never received a signal again. I suppose signals are bound per process and as celery runs in a different process, this means I cannot catch the signal in the main process. Should I use a thread_local to fix this? Or am I overlooking the obvious?

Thanks

@task
def calculate(self, input):
    result = input * 2

    if result > 4:
        result_higher_then_four.send(result)

    return result

Upvotes: 12

Views: 9240

Answers (2)

ant31
ant31

Reputation: 4760

You can use the celeryd_init signal to initialize your workers and signals http://celery.readthedocs.org/en/latest/userguide/signals.html#celeryd-init

Based on what you provided, I've tested with:

from celery.signals import celeryd_init
from celery.utils.dispatch import Signal

def process_result(result, *args, **kwargs):
    print "signals received: %s" % result

result_higher_then_four = Signal()

@celeryd_init.connect
def init_signals(*args, **kwargs):
    result_higher_then_four.connect(process_result)

@task(bind=True)
def calculate(self, input):
    result = input * 2

    if result > 4:
       result_higher_then_four.send(result=result, sender=self)

    return result

Upvotes: 4

joshua
joshua

Reputation: 2519

The problem is that the signal receiver isn't getting registered. The celery workers run in their own process so the signal connections need to be made in that process. If you know what they are or can discover them, you can register them during task initialization using this technique.

Of course, that eliminates some of the benefit of using signals in the first place because you need to know the connections in advance.

One idea is to assume that the signal receivers will always register in the models module of each app. In which case the following will work.

class CalculateTask(celery.Task):

    def __init__(self):
        from django.conf import settings
        for app in settings.INSTALLED_APPS:
            app_models = '{}.{}'.format(app,'models') 
            __import__(app_models, globals=globals())                                 

    def run(self, input):
        result = input * 2
        if result > 4:
            result_higher_then_four.send(result)

        return result

Upvotes: 4

Related Questions