Prabhat
Prabhat

Reputation: 13

Django celery - Restart the task when it's completed


    @app.task
    def Task1():
        print("this is task 1")
        return  "Task-1 Done"

Just take an example I want to restart the task when it's completed

Upvotes: 1

Views: 1136

Answers (1)

Niel Godfrey P. Ponciano
Niel Godfrey P. Ponciano

Reputation: 10709

Manual consecutive calls

If you want to call the task multiple times and make it use the same task id every time, you can use the task_id argument of apply_async.

Note that this is not applicable with delay as documented:

delay(*args, **kwargs)

  • Star argument version of apply_async().

  • Does not support the extra options enabled by apply_async().

@app.task(bind=True)
def Task1(self):
    print(f"this is task 1 {self.request.id}")
>>> from tasks import Task1
>>> result = Task1.apply_async()
>>> result
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> result.id
'ba488582-9d7d-4bda-a19d-a2b0bf9b503f'
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> 
[2021-08-12 08:24:31,537: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:24:31,538: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:24:31,539: WARNING/ForkPoolWorker-4] 

[2021-08-12 08:24:31,539: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.00041928999962692615s: None
[2021-08-12 08:25:00,608: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:00,609: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:00,609: WARNING/ForkPoolWorker-4] 

[2021-08-12 08:25:00,609: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0002528750001147273s: None
[2021-08-12 08:25:06,137: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:06,139: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:06,139: WARNING/ForkPoolWorker-4] 

[2021-08-12 08:25:06,139: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0003467680007815943s: None
[2021-08-12 08:25:10,537: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:10,539: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:10,539: WARNING/ForkPoolWorker-4] 

[2021-08-12 08:25:10,539: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0006299719998423825s: None
  • The task id is the same for all executions, which here is ba488582-9d7d-4bda-a19d-a2b0bf9b503f (as visible in the AsyncResult as well)

Automatic consecutive calls

If you want to keep on restarting the task, here are some options. All options below are infinitely recursive. You might want to put some conditions within the task about when the endless loop would be terminated, such as adding an input to the task and using it as basis if the execution must already stop.

Option 1: Call the task itself asynchronously within the same task. This is somewhat recursion-like. This would use the same task id as how it was done in the Manual consecutive calls (see above).

@app.task(bind=True)
def Task1(self):
    print(f"this is task 1 {self.request.id}")
    time.sleep(2)
    print("re-trigger task 1")
    Task1.apply_async(task_id=self.request.id)

Option 2: Trigger the retry mechanism provided by celery. This will use the same task id on the same queue as documented in that link:

When you call retry it’ll send a new message, using the same task-id, and it’ll take care to make sure the message is delivered to the same queue as the originating task.

We can verify this by displaying the task id via self.request.id.

@app.task(
    bind=True,
    default_retry_delay=0.1,
    retry_backoff=False,
    max_retries=None,
)
def Task1(self):
    print(f"this is task 1 {self.request.id}")
    time.sleep(2)
    print("re-trigger task 1")
    raise self.retry()

Option 3: Retry only for a specific scenario (here is RestartTaskNeeded). Same with Option 2, this will also use the same task id on the same queue.

class RestartTaskNeeded(Exception):
    pass


@app.task(
    bind=True,
    autoretry_for=(RestartTaskNeeded,),
    default_retry_delay=0.1,
    retry_backoff=False,
    max_retries=None,
)
def Task1(self):
    print(f"this is task 1 {self.request.id}")
    time.sleep(2)
    print("re-trigger task 1")
    raise RestartTaskNeeded

Output:

>>> from tasks import Task1
>>> Task1.apply_async()
<AsyncResult: 999e9de0-292f-412d-a9a8-b5c0013bdab3>
[2021-08-12 07:51:29,783: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:29,785: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:29,785: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:31,796: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:31,797: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:31,820: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:31,820: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:32,020: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:32,020: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:34,023: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:34,023: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:34,028: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:34,028: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:36,031: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:36,031: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:38,034: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:38,034: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:38,038: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:38,039: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:40,041: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:40,042: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:42,044: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:42,045: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:42,049: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:42,051: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:44,050: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:44,051: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:46,052: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:46,052: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:46,057: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:46,058: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:46,681: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:46,681: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:48,682: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:48,683: WARNING/ForkPoolWorker-1] 

[2021-08-12 07:51:48,687: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:48,688: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received

... and so on ...
  • The task always (automatically) "restarts" after completion
  • The task id is the same for all executions, which here is 999e9de0-292f-412d-a9a8-b5c0013bdab3 (as visible in the AsyncResult as well)

Further Reading

Depending on your exact purpose for this question, you might also be interested with Celery canvas e.g. chaining of tasks (calling a task after the completion of another task, the tasks maybe different or maybe the same).

Upvotes: 1

Related Questions