Reputation: 3695
I am encountering some very strange behavior with the following sort of celery workflow:
workflow = group(
chain(task1.s(), task2.s()),
chain(task3.s(), task4.s()),
)
This is in the context of django.
When I call the workflow as follows:
workflow.apply_async((n,))
...for any integer value of n, the first task in each chain (task1
and task3
) will fail with a TypeError like the following (taken from celery events
):
args: [9, 8, 7, 5, 4, 3]
kwargs: {}
retries: 0
exception: TypeError('task1() takes exactly 1 argument (6 given)',)
state: FAILURE
The arguments after the first are always arguments that the workflow was previously called with. So, in this example, I have called workflow.apply_async((9,))
, on this occasion, and the other numbers are values that were passed on previous occasions. On each occasion, the erronious arguments passed to task1
and task3
will be the same.
I'm tempted to post this as a bug report to celery, but I'm not yet certain the mistake isn't mine in some way.
Things I have ruled out:
workflow.apply_async
. I have separately constructed and logged the tuple that I pass, to make sure of this.apply_async
rather than a tuple. I am definitely passing a tuple (i.e. immutable).The only moderately unusual thing about my setup, although I can't see how it's connected, it that task1
and task3
are configured with different queues.
Upvotes: 0
Views: 1129
Reputation: 153
Had come across a similar problem when i was working with celery task.chunks()
I solved it by having list of items contained into a single tuple. For instance,
suppose that task log_i()
is a shared_task which essentially logs variable i
, and I wish to log a list of all i
s by chunking I'd do -
# log_i shared Task
@shared_task(bind=True)
def log_i(self, i):
logger.info(i)
return i
And
# some calling site
# ...
very_long_list = [{"i1": i, "i2": i+i, "i3": i**2} for i in range(1000)]
res = log_i.chunks(zip(very_long_list), 10)()
print(res.get())
# ...
Note to self that doing something such as -
# ...
res = log_i.chunks(very_long_list, 10)()
# ...
will fail with the error you're talking about when the items in the list are not iterables.
Zipping moves the item as is into new tuple and by this you can capture the same into a single argument in the log_i
task.
Upvotes: 1