Reputation: 6162
I need to pass additional arguments to my callback function in celery chords. (Celery version : 4.1.0 (latentcall) and Python 2.7)
Consider the below sample:
program.py
from tasks import get_stock_info, call_back
from celery import group, chord
def chord_queue():
header = (get_stock_info.subtask((delay,)) for delay in [4, 5, 4])
callback = call_back.subtask()
header1 = (get_stock_info.subtask((delay,)) for delay in [4, 4, 4])
res = chord(header,queue='susanoo_dev')(callback)
res1 = chord(header1,queue='susanoo_core')(callback)
print(res.get())
print(res1.get())
print("We are done")
if __name__ == '__main__':
chord_queue()
tasks.py
from pandas_datareader import data
from celery_app import app
import time
@app.task
def get_stock_info(delay):
print('hello Celery--------')
time.sleep(delay)
print('Whats up')
return 10
@app.task
def call_back(num):
print("Everything is done------")
print("Everything is done------")
return sum(num)
celery_app.py
from celery import Celery
from kombu import Queue
app = Celery('tasks', broker='amqp://my_user:my_pass@localhost/my_vhost', backend='redis://localhost:6379/0')
CELERY_CONFIG = {
'CELERY_DEFAULT_QUEUE': 'default',
'CELERY_QUEUES': (Queue('dev'), Queue('core'),)
}
app.conf.update(**CELERY_CONFIG)
Now in this case, when the chord gets called and after all the 3 get_stock_info tasks are done, the call_back gets called, to which the value 10, which is the return value from the get_stock_info gets passed automatically. Now along with the return values I would also want to pass an additional argument say a string as "abcd" to the callback function.
How do I do it ?
I have already tried doing this as suggested on some blogs/SO answers etc.
program.py
def chord_queue():
header = (get_stock_info.subtask((delay,)) for delay in [4, 5, 4])
callback = call_back.subtask(kwargs={'my_str' : 'abcd'})
header1 = (get_stock_info.subtask((delay,)) for delay in [4, 4, 4])
res = chord(header,queue='susanoo_dev' )(callback)
res1 = chord(header1,queue='susanoo_core')(callback)
print(res.get())
print(res1.get())
print("We are done")
tasks.py
@app.task
def call_back(num, my_str):
print("Everything is done------")
print("Everything is done------")
print my_str
return my_str, sum(num)
But this does not seem to work and throws the following error:
celery.backends.base.ChordError: Callback error: TypeError("call_back() got an unexpected keyword argument 'my_str'",)
Upvotes: 1
Views: 3244
Reputation: 6162
Got the answer. Thanks to a friend who helped me out with it.
All that was being done wrong in the above solution was to not define my_str
as a keyword argument in the definition of the call_back()
.
So the working solution would be:
program.py
def chord_queue():
header = (get_stock_info.subtask((delay,)) for delay in [4, 5, 4])
callback = call_back.subtask(kwargs={'my_str' : 'abcd'})
header1 = (get_stock_info.subtask((delay,)) for delay in [4, 4, 4])
res = chord(header,queue='susanoo_dev' )(callback)
res1 = chord(header1,queue='susanoo_core')(callback)
print(res.get())
print(res1.get())
print("We are done")
task.py
@app.task
def call_back(num, my_str=None):
print("Everything is done------")
print("Everything is done------")
print my_str
return my_str, sum(num)
And it works as expected without any issues.
Upvotes: 3