samfrances
samfrances

Reputation: 3695

Celery canvas group of chains passing too many arguments to constituent tasks

I am encountering some very strange behavior with the following sort of celery workflow:

workflow = group(
    chain(task1.s(), task2.s()),
    chain(task3.s(), task4.s()),
)

This is in the context of django.

When I call the workflow as follows:

workflow.apply_async((n,))

...for any integer value of n, the first task in each chain (task1 and task3) will fail with a TypeError like the following (taken from celery events):

   args: [9, 8, 7, 5, 4, 3]
   kwargs: {} 
   retries: 0 
   exception: TypeError('task1() takes exactly 1 argument (6 given)',) 
   state: FAILURE

The arguments after the first are always arguments that the workflow was previously called with. So, in this example, I have called workflow.apply_async((9,)), on this occasion, and the other numbers are values that were passed on previous occasions. On each occasion, the erronious arguments passed to task1 and task3 will be the same.

I'm tempted to post this as a bug report to celery, but I'm not yet certain the mistake isn't mine in some way.

Things I have ruled out:

The only moderately unusual thing about my setup, although I can't see how it's connected, it that task1 and task3 are configured with different queues.

Upvotes: 0

Views: 1129

Answers (1)

Skanda Avadhani
Skanda Avadhani

Reputation: 153

Had come across a similar problem when i was working with celery task.chunks()

I solved it by having list of items contained into a single tuple. For instance,

suppose that task log_i() is a shared_task which essentially logs variable i, and I wish to log a list of all is by chunking I'd do -

# log_i shared Task
@shared_task(bind=True)
def log_i(self, i):
    logger.info(i)
    return i

And

# some calling site 
# ...
very_long_list = [{"i1": i, "i2": i+i, "i3": i**2} for i in range(1000)]
res = log_i.chunks(zip(very_long_list), 10)()
print(res.get())

# ...

Note to self that doing something such as -

# ...
res = log_i.chunks(very_long_list, 10)()
# ...

will fail with the error you're talking about when the items in the list are not iterables.

Zipping moves the item as is into new tuple and by this you can capture the same into a single argument in the log_i task.

Upvotes: 1

Related Questions