Reputation: 13
@app.task
def Task1():
print("this is task 1")
return "Task-1 Done"
Just take an example I want to restart the task when it's completed
Upvotes: 1
Views: 1136
Reputation: 10709
If you want to call the task multiple times and make it use the same task id every time, you can use the task_id
argument of apply_async.
Note that this is not applicable with delay as documented:
delay(*args, **kwargs)
Star argument version of apply_async().
Does not support the extra options enabled by apply_async().
@app.task(bind=True)
def Task1(self):
print(f"this is task 1 {self.request.id}")
>>> from tasks import Task1
>>> result = Task1.apply_async()
>>> result
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> result.id
'ba488582-9d7d-4bda-a19d-a2b0bf9b503f'
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>>
[2021-08-12 08:24:31,537: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:24:31,538: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:24:31,539: WARNING/ForkPoolWorker-4]
[2021-08-12 08:24:31,539: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.00041928999962692615s: None
[2021-08-12 08:25:00,608: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:00,609: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:00,609: WARNING/ForkPoolWorker-4]
[2021-08-12 08:25:00,609: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0002528750001147273s: None
[2021-08-12 08:25:06,137: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:06,139: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:06,139: WARNING/ForkPoolWorker-4]
[2021-08-12 08:25:06,139: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0003467680007815943s: None
[2021-08-12 08:25:10,537: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:10,539: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:10,539: WARNING/ForkPoolWorker-4]
[2021-08-12 08:25:10,539: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0006299719998423825s: None
ba488582-9d7d-4bda-a19d-a2b0bf9b503f
(as visible in the AsyncResult
as well)If you want to keep on restarting the task, here are some options. All options below are infinitely recursive. You might want to put some conditions within the task about when the endless loop would be terminated, such as adding an input to the task and using it as basis if the execution must already stop.
Option 1: Call the task itself asynchronously within the same task. This is somewhat recursion-like. This would use the same task id as how it was done in the Manual consecutive calls (see above).
@app.task(bind=True)
def Task1(self):
print(f"this is task 1 {self.request.id}")
time.sleep(2)
print("re-trigger task 1")
Task1.apply_async(task_id=self.request.id)
Option 2: Trigger the retry mechanism provided by celery. This will use the same task id on the same queue as documented in that link:
When you call retry it’ll send a new message, using the same task-id, and it’ll take care to make sure the message is delivered to the same queue as the originating task.
We can verify this by displaying the task id via self.request.id
.
@app.task(
bind=True,
default_retry_delay=0.1,
retry_backoff=False,
max_retries=None,
)
def Task1(self):
print(f"this is task 1 {self.request.id}")
time.sleep(2)
print("re-trigger task 1")
raise self.retry()
Option 3: Retry only for a specific scenario (here is RestartTaskNeeded
). Same with Option 2, this will also use the same task id on the same queue.
class RestartTaskNeeded(Exception):
pass
@app.task(
bind=True,
autoretry_for=(RestartTaskNeeded,),
default_retry_delay=0.1,
retry_backoff=False,
max_retries=None,
)
def Task1(self):
print(f"this is task 1 {self.request.id}")
time.sleep(2)
print("re-trigger task 1")
raise RestartTaskNeeded
Output:
>>> from tasks import Task1
>>> Task1.apply_async()
<AsyncResult: 999e9de0-292f-412d-a9a8-b5c0013bdab3>
[2021-08-12 07:51:29,783: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:29,785: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:29,785: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:31,796: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:31,797: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:31,820: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:31,820: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:32,020: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:32,020: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:34,023: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:34,023: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:34,028: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:34,028: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:36,031: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:36,031: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:38,034: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:38,034: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:38,038: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:38,039: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:40,041: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:40,042: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:42,044: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:42,045: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:42,049: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:42,051: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:44,050: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:44,051: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:46,052: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:46,052: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:46,057: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:46,058: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:46,681: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:46,681: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:48,682: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:48,683: WARNING/ForkPoolWorker-1]
[2021-08-12 07:51:48,687: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:48,688: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
... and so on ...
999e9de0-292f-412d-a9a8-b5c0013bdab3
(as visible in the AsyncResult
as well)Depending on your exact purpose for this question, you might also be interested with Celery canvas e.g. chaining of tasks (calling a task after the completion of another task, the tasks maybe different or maybe the same).
Upvotes: 1