Alex Punnen
Alex Punnen

Reputation: 6224

Celery handling named argument

I have a celery task like this

@app.task(bind=True,max_retries=3, default_retry_delay=1 * 60)  
def doTargetprefilter(self,*args,**kwargs ):

I am calling this as

args = [None,sourcedns, targetdnlist]
kwargs= {'workername':workername,}
result = r.doTargetprefilter.apply_async(*args,**kwargs)

However I am getting a strange error

File "/usr/local/lib/python3.4/dist-packages/celery/app/amqp.py", line 254, in publish_task
raise ValueError('task kwargs must be a dictionary')

ValueError: task kwargs must be a dictionary

A small unit test of the invocation works fine;

def test_doTargetprefilter(self):
    from  ltecpxx.mrosimpleexecutor import doTargetprefilter
    s=[1,2,3]
    t=[1,2,2]
    workername="ltecpxx.mrosimpleexecutor.SimplePrefilter"
    args =[None,s,t]
    kwargs={'wokername':workername}
    doTargetprefilter(*args,**kwargs)

I have tried all sorts of combinaiton and also seen the apply_async documentation. It works if I make it as a normal method (without *args and **kwargs); What am I doing wrong

Upvotes: 0

Views: 1937

Answers (1)

Alex Punnen
Alex Punnen

Reputation: 6224

The bind annotation supplies the self; So we need to remove that from the args list, and all the arguments must be in a tuple when we call apply_async. Changing these two will give

args = [sourcedns, targetdnlist]
kwargs= {'workername':workername}
result = r.doTargetprefilter.apply_async((args),kwargs)

And function signature

@app.task(bind=True,max_retries=3, default_retry_delay=1 * 60)  # retry in 1 minutes.
def doTargetprefilter(self,*args,workername=""):

Upvotes: 1

Related Questions