Reputation: 6224
I have a celery task like this
@app.task(bind=True,max_retries=3, default_retry_delay=1 * 60)
def doTargetprefilter(self,*args,**kwargs ):
I am calling this as
args = [None,sourcedns, targetdnlist]
kwargs= {'workername':workername,}
result = r.doTargetprefilter.apply_async(*args,**kwargs)
However I am getting a strange error
File "/usr/local/lib/python3.4/dist-packages/celery/app/amqp.py", line 254, in publish_task
raise ValueError('task kwargs must be a dictionary')
ValueError: task kwargs must be a dictionary
A small unit test of the invocation works fine;
def test_doTargetprefilter(self):
from ltecpxx.mrosimpleexecutor import doTargetprefilter
s=[1,2,3]
t=[1,2,2]
workername="ltecpxx.mrosimpleexecutor.SimplePrefilter"
args =[None,s,t]
kwargs={'wokername':workername}
doTargetprefilter(*args,**kwargs)
I have tried all sorts of combinaiton and also seen the apply_async documentation. It works if I make it as a normal method (without *args and **kwargs); What am I doing wrong
Upvotes: 0
Views: 1937
Reputation: 6224
The bind annotation supplies the self; So we need to remove that from the args list, and all the arguments must be in a tuple when we call apply_async. Changing these two will give
args = [sourcedns, targetdnlist]
kwargs= {'workername':workername}
result = r.doTargetprefilter.apply_async((args),kwargs)
And function signature
@app.task(bind=True,max_retries=3, default_retry_delay=1 * 60) # retry in 1 minutes.
def doTargetprefilter(self,*args,workername=""):
Upvotes: 1