Reputation: 593
Considering a task takes a list as arguments and process each element in the list, which may succeed or fail. In this case, how to "retry" with the failed elements only?
Example:
@app.task(bind=True)
def my_test(self, my_list:list):
new_list = []
for ele in my_list:
try:
do_something_may_fail(ele)
except:
new_list.append(ele)
# how to retry with the new list?
# like
# self.retry(my_list=new_list, countdown=5)
# or
# self.apply_async(new_list, countdown=5)
Upvotes: 7
Views: 4377
Reputation: 10699
Use Task.retry with its args
and kwargs
input.
retry(args=None, kwargs=None, exc=None, throw=True, eta=None, countdown=None, max_retries=None, **options)
Retry the task, adding it to the back of the queue.
Parameters
args (Tuple) – Positional arguments to retry with.
kwargs (Dict) – Keyword arguments to retry with.
Be aware when passing arguments because having a value just for the same parameter both in args
and kwargs
would result to failure. Below, I chose to only use args=(<values here>)
and empty out kwargs={}
. You may also opt to do the other way around where you would use kwargs={<values here>}
and empty out args=()
.
tasks.py
from celery import Celery
app = Celery('tasks')
@app.task(
bind=True,
default_retry_delay=0.1,
retry_backoff=False,
max_retries=None,
)
def my_test(self, some_arg_1: int, my_list: list, some_arg_2: str):
print(f"my_test {some_arg_1} {my_list} {some_arg_2}")
# Filter the failed items. Here, let's say only the last item is successful.
new_list = my_list[:-1]
if new_list:
self.retry(
args=(
some_arg_1 + 1, # some_arg_1 increments per retry
new_list, # Failed items
some_arg_2 * 2, # some_arg_2's length doubles per retry
),
kwargs={}, # Empty it out to avoid having multiple values for the arguments whether we initially called it with args or kwargs or both.
)
Logs (Producer)
>>> from tasks import *
>>> my_test.apply_async(args=(0, [1,2,3,4,5], "a"))
<AsyncResult: 121090c6-6b77-4cbd-b1d1-790005e8b18c>
>>>
>>> # The above command is just equivalent to the following (just the same result):
>>> # my_test.apply_async(kwargs={'some_arg_1': 0, 'my_list': [1,2,3,4,5], 'some_arg_2': "a"})
>>> # my_test.apply_async(args=(0,), kwargs={'my_list': [1,2,3,4,5], 'some_arg_2': "a"})
Logs (Consumer)
[2021-08-25 21:32:06,433: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,434: WARNING/MainProcess] my_test 0 [1, 2, 3, 4, 5] a
[2021-08-25 21:32:06,434: WARNING/MainProcess]
[2021-08-25 21:32:06,438: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,439: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,539: WARNING/MainProcess] my_test 1 [1, 2, 3, 4] aa
[2021-08-25 21:32:06,539: WARNING/MainProcess]
[2021-08-25 21:32:06,541: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,542: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,640: WARNING/MainProcess] my_test 2 [1, 2, 3] aaaa
[2021-08-25 21:32:06,640: WARNING/MainProcess]
[2021-08-25 21:32:06,642: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,643: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,742: WARNING/MainProcess] my_test 3 [1, 2] aaaaaaaa
[2021-08-25 21:32:06,743: WARNING/MainProcess]
[2021-08-25 21:32:06,745: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] retry: Retry in 0.1s
[2021-08-25 21:32:06,747: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] received
[2021-08-25 21:32:06,844: WARNING/MainProcess] my_test 4 [1] aaaaaaaaaaaaaaaa
[2021-08-25 21:32:06,844: WARNING/MainProcess]
[2021-08-25 21:32:06,844: INFO/MainProcess] Task tasks.my_test[121090c6-6b77-4cbd-b1d1-790005e8b18c] succeeded in 0.0005442450019472744s: None
some_arg_1
increments by 1 per retry, from the starting value of 0
to the last value of 4
my_list
loses 1 item per retry, from the starting value of [1, 2, 3, 4, 5]
to the last value of [1]
some_arg_2
doubles it's size per retry, from the starting value of "a"
to the last value of "aaaaaaaaaaaaaaaa"
Just recall the same task from within the task itself, somewhat recursion-like.
tasks.py
from celery import Celery
app = Celery('tasks')
@app.task
def my_test(some_arg_1: int, my_list: list, some_arg_2: str):
print(f"my_test {some_arg_1} {my_list} {some_arg_2}")
# Filter the failed items. Here, let's say only the last item is successful.
new_list = my_list[:-1]
if new_list:
my_test.apply_async(
args=(
some_arg_1 + 1, # some_arg_1 increments per retry
new_list, # Failed items
some_arg_2 * 2, # some_arg_2's length doubles per retry
),
kwargs={}, # Empty it out to avoid having multiple values for the arguments whether we initially called it with args or kwargs or both.
)
Logs (Producer and Consumer)
Upvotes: 5