Nikhil Singh
Nikhil Singh

Reputation: 665

Celery not running chord callback

After looking at a lot of articles about chord callbacks not executing and trying their solutions, I am still unable to get it to work. In fact, the chord_unlock method is also not getting executed for some reason.

celery.py

from __future__ import absolute_import
from celery import Celery

app = Celery('sophie',
             broker='redis://localhost:6379/2',
             backend='redis://localhost:6379/2',
             include=['sophie.lib.chord_test'])

app.conf.update(
    CELERY_ACCEPT_CONTENT=["json"],
    CELERY_TASK_SERIALIZER="json",
    CELERY_TRACK_STARTED=True,
    CELERYD_PREFETCH_MULTIPLIER=1,   # NO PREFETCHING OF TASKS
    BROKER_TRANSPORT_OPTIONS = {
        'priority_steps': [0, 1]     # ALLOW ONLY 2 TASK PRIORITIES
    }
)

if __name__ == '__main__':
    app.start()

chord_test.py

from __future__ import absolute_import
from sophie.celery import app
from celery import chord

@app.task(name='sophie.lib.add')
def add(x, y):
    return x + y

@app.task(name='sophie.lib.tsum')
def tsum(numbers):
    return sum(numbers)

if __name__ == '__main__':
    tasks = [add.s(100, 100), add.s(200, 200)]
    chord(tasks, tsum.s()).apply_async()

The output of my worker logfile is as follows

$ celery worker -l info --app=sophie.celery -n worker1.%h

 -------------- [email protected] v3.1.6 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-3.0.0-12-server-x86_64-with-Ubuntu-11.10-oneiric
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> broker:      redis://localhost:6379/2
- ** ---------- .> app:         sophie:0x3554250
- ** ---------- .> concurrency: 1 (prefork)
- *** --- * --- .> events:      OFF (enable -E to monitor this worker)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . sophie.lib.add
  . sophie.lib.tsum

[2013-12-12 19:37:26,499: INFO/MainProcess] Connected to redis://localhost:6379/2
[2013-12-12 19:37:26,506: INFO/MainProcess] mingle: searching for neighbors
[2013-12-12 19:37:27,512: INFO/MainProcess] mingle: all alone
[2013-12-12 19:37:27,527: WARNING/MainProcess] [email protected] ready.
[2013-12-12 19:37:29,723: INFO/MainProcess] Received task: sophie.lib.add[b7d504c1-217f-43a9-b57e-86f0fcbdbe22]
[2013-12-12 19:37:29,734: INFO/MainProcess] Task sophie.lib.add[b7d504c1-217f-43a9-b57e-86f0fcbdbe22] succeeded in 0.009769904
00522s: 200
[2013-12-12 19:37:29,735: INFO/MainProcess] Received task: sophie.lib.add[eb01a73e-f6c8-401d-8049-6cdbc5f0bd90]
[2013-12-12 19:37:29,737: INFO/MainProcess] Task sophie.lib.add[eb01a73e-f6c8-401d-8049-6cdbc5f0bd90] succeeded in 0.001446505
00442s: 400

There is no chord_unlock being called at all. Some more output to give further context:

$ sudo pip freeze | egrep 'celery|kombu|billiard'
billiard==3.3.0.12
celery==3.1.6
kombu==3.0.7

$ uname -a
Linux vagrant-ubuntu-11 3.0.0-12-server #20-Ubuntu SMP Fri Oct 7 16:36:30 UTC 2011 x86_64 x86_64 x86_64 GNU/Linux

$ redis-server --version
Redis server version 2.2.11 (00000000:0)

Upvotes: 5

Views: 6278

Answers (4)

Gabriel Muj
Gabriel Muj

Reputation: 3805

For me the callback function was never executed because the celery result expires time was set very low, so beside CELERY_RESULT_BACKEND and ignore_result=False as stated in other comment, you should check this setting just in case CELERY_RESULT_EXPIRES

Link to documentation: https://docs.celeryq.dev/en/stable/userguide/configuration.html#std-setting-result_expires

Upvotes: 0

Kipr
Kipr

Reputation: 1076

Happened to me when my task signatures did not match the argments provided by the flow, accepting a *args might help debug such an issue.

Upvotes: 1

Ikar Pohorský
Ikar Pohorský

Reputation: 5039

There is an Important Notes section under the chords documentation that says:

Tasks used within a chord must not ignore their results. In practice this means that you must enable a CELERY_RESULT_BACKEND in order to use chords. Additionally, if CELERY_IGNORE_RESULT is set to True in your configuration, be sure that the individual tasks to be used within the chord are defined with ignore_result=False. This applies to both Task subclasses and decorated tasks.

Even when I followed the recommendations the chord callback is not executed, but might help others.

EDIT:

Found what it was in my case! The chord doesn't fully support specifying queue name in all cases: https://github.com/celery/celery/issues/2085

Upvotes: 1

Lycha
Lycha

Reputation: 10177

The chords documentation uses the following syntax for chord:

 result = chord(header)(callback)

So you could do:

 chord(tasks)(tsum.s())

Upvotes: 4

Related Questions