Hamedio
Hamedio

Reputation: 117

how break a celery task inside task

I an using celery with django.inside a forloop , each turn a value sets in db for a relationship

lawyers=Lawyer.objects.filter(consultation_status=True)
for idx,lawyer in enumerate(lawyers):
    if(consultation.lawyer):
        break
    change_offered_lawyer.apply_async((id,lawyer.id),countdown=idx*60)

each turn in the loop inside task i check the condition and my goal is if the condition terminated then break all those tasks.

@app.task
def change_offered_lawyer(consulation_id,consulator_id):
    consulation=ConsultationOrder.objects.get(id=consulation_id)
    consulator=Lawyer.objects.get(id=consulator_id)
    if(consultation.lawyer):
        #break all tasks 
    consultation.offered_lawyer=consulator
    consultation.save()

Upvotes: 1

Views: 964

Answers (1)

Niel Godfrey P. Ponciano
Niel Godfrey P. Ponciano

Reputation: 10699

For a simpler explanation below, we would just use a list of numbers 1-10, which should break after the 4th number, thus processing 1-3 and skipping 4-10.

Solution 1: Using chained tasks

Design summary:

Link each task next to each other. After one task returns, the next one is called, and so on. A task must return a value that will indicate if the next task should continue or break.

Producer:

from celery import chain

from task import change_offered_lawyer

tasks = []

lawyers_count = 10
for lawyer_id in range(1, lawyers_count + 1):
    if lawyer_id == 1:
        # If first item, manually set the initial value for should_continue
        tasks.append(change_offered_lawyer.s(True, lawyer_id))
    else:
        tasks.append(change_offered_lawyer.s(lawyer_id))

chain(*tasks).apply_async()

Consumer:

from celery import shared_task


@shared_task
def change_offered_lawyer(should_continue, lawyer_id):
    if not should_continue:
        print(f"{lawyer_id=} Break...")
        return False

    if lawyer_id == 4:
        print(f"{lawyer_id=} Break now!")
        return False

    print(f"{lawyer_id=} Continue...")
    return True

Logs:

[2021-08-10 03:04:03,294: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b01490c6-59ee-473e-b15d-40d4f628444c] received
[2021-08-10 03:04:03,296: WARNING/MainProcess] lawyer_id=1 Continue...
[2021-08-10 03:04:03,296: WARNING/MainProcess] 

[2021-08-10 03:04:03,299: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b01490c6-59ee-473e-b15d-40d4f628444c] succeeded in 0.003656674999547249s: True
[2021-08-10 03:04:03,300: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[3ea4e578-4f56-491b-a774-b0a95b33c123] received
[2021-08-10 03:04:03,301: WARNING/MainProcess] lawyer_id=2 Continue...
[2021-08-10 03:04:03,301: WARNING/MainProcess] 

[2021-08-10 03:04:03,302: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[3ea4e578-4f56-491b-a774-b0a95b33c123] succeeded in 0.001406519999363809s: True
[2021-08-10 03:04:03,303: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[dca891ef-4bac-4fa0-85e3-9d7ddfe031d5] received
[2021-08-10 03:04:03,305: WARNING/MainProcess] lawyer_id=3 Continue...
[2021-08-10 03:04:03,305: WARNING/MainProcess] 

[2021-08-10 03:04:03,307: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[dca891ef-4bac-4fa0-85e3-9d7ddfe031d5] succeeded in 0.0023091260000001057s: True
[2021-08-10 03:04:03,308: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[ea57df15-88e7-4b65-8fc3-79df0a189652] received
[2021-08-10 03:04:03,313: WARNING/MainProcess] lawyer_id=4 Break now!
[2021-08-10 03:04:03,313: WARNING/MainProcess] 

[2021-08-10 03:04:03,314: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[ea57df15-88e7-4b65-8fc3-79df0a189652] succeeded in 0.001986995999686769s: False
[2021-08-10 03:04:03,315: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[1e8fa660-2125-4790-b44e-4f884b826b40] received
[2021-08-10 03:04:03,318: WARNING/MainProcess] lawyer_id=5 Break...
[2021-08-10 03:04:03,318: WARNING/MainProcess] 

[2021-08-10 03:04:03,324: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[1e8fa660-2125-4790-b44e-4f884b826b40] succeeded in 0.006545270000060555s: False
[2021-08-10 03:04:03,325: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b6825617-6162-4273-ad14-c128190e18be] received
[2021-08-10 03:04:03,327: WARNING/MainProcess] lawyer_id=6 Break...
[2021-08-10 03:04:03,327: WARNING/MainProcess] 

[2021-08-10 03:04:03,328: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b6825617-6162-4273-ad14-c128190e18be] succeeded in 0.002028814999903261s: False
[2021-08-10 03:04:03,329: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[5d8c1201-48c7-4238-8f92-b999adccfad9] received
[2021-08-10 03:04:03,330: WARNING/MainProcess] lawyer_id=7 Break...
[2021-08-10 03:04:03,330: WARNING/MainProcess] 

[2021-08-10 03:04:03,331: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[5d8c1201-48c7-4238-8f92-b999adccfad9] succeeded in 0.0015183160003289231s: False
[2021-08-10 03:04:03,332: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[8aedf4ad-b3d6-44f6-b5e8-8227bc02df8a] received
[2021-08-10 03:04:03,333: WARNING/MainProcess] lawyer_id=8 Break...
[2021-08-10 03:04:03,333: WARNING/MainProcess] 

[2021-08-10 03:04:03,335: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[8aedf4ad-b3d6-44f6-b5e8-8227bc02df8a] succeeded in 0.0019754629993258277s: False
[2021-08-10 03:04:03,336: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[28dfedc7-40d6-45ab-8d3b-972db47e99bf] received
[2021-08-10 03:04:03,337: WARNING/MainProcess] lawyer_id=9 Break...
[2021-08-10 03:04:03,337: WARNING/MainProcess] 

[2021-08-10 03:04:03,338: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[28dfedc7-40d6-45ab-8d3b-972db47e99bf] succeeded in 0.0016348939998351852s: False
[2021-08-10 03:04:03,339: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[288f6c3d-77d7-4161-9b77-45bcd815f14e] received
[2021-08-10 03:04:03,341: WARNING/MainProcess] lawyer_id=10 Break...
[2021-08-10 03:04:03,341: WARNING/MainProcess] 

[2021-08-10 03:04:03,341: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[288f6c3d-77d7-4161-9b77-45bcd815f14e] succeeded in 0.00046298599954752717s: False

References:

Solution 2: Using a common storage to track status

Design summary:

Use a database storage that will track whether the execution must continue or break. Here, we would use django-caching system for ease of use. Note that if you would ever intend to run the tasks in parallel without the countdown, this solution could be effective against race conditions.

settings.py :

...
CACHES = {
    "default": {
        "BACKEND": "django.core.cache.backends.db.DatabaseCache",
        "LOCATION": "my_cache_table",
    }
}
...
  • After this, run python manage.py createcachetable

Producer:

from django.core.cache import cache

from task import change_offered_lawyer, reset_cache

cache.set(key='change_offered_lawyer', value='continue', timeout=None)

lawyers_count = 10
for lawyer_id in range(1, lawyers_count + 1):
    if lawyer_id == lawyers_count:
        # If last item, reset the cache
        change_offered_lawyer.apply_async((lawyer_id,), countdown=lawyer_id * 5, link=reset_cache.si())
    else:
        change_offered_lawyer.apply_async((lawyer_id,), countdown=lawyer_id * 5)

Consumer:

from celery import shared_task
from django.core.cache import cache


@shared_task
def change_offered_lawyer(lawyer_id):
    if cache.get('change_offered_lawyer') == 'break':
        print(f"{lawyer_id=} Break...")
        return

    if lawyer_id == 4:
        cache.set(key='change_offered_lawyer', value='break', timeout=None)
        print(f"{lawyer_id=} Break now!")
        return

    print(f"{lawyer_id=} Continue...")


@shared_task
def reset_cache():
    cache.delete('change_offered_lawyer')

Logs:

[2021-08-10 03:10:32,333: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[604585ed-76aa-44b6-9e9c-2119f0e33da2] received
[2021-08-10 03:10:32,335: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[6305c7d1-c5ff-4c40-915c-6e37ecf9c440] received
[2021-08-10 03:10:32,337: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[3437dc9d-e3a5-4da3-8020-be52cf871aa1] received
[2021-08-10 03:10:32,343: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[682a21e5-0531-443b-b829-17c40b9c6509] received
[2021-08-10 03:10:32,344: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[0d7dbff6-0207-441e-a3e3-2840f028b393] received
[2021-08-10 03:10:32,345: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[e4e7987f-1ebe-4fc6-b37d-a475783fa1b5] received
[2021-08-10 03:10:32,346: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[a9dc6b82-c507-47f0-9bab-073d33e24a19] received
[2021-08-10 03:10:32,347: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[ac8b57cd-2c15-4978-9a70-69d9f876d776] received
[2021-08-10 03:10:32,349: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[97fc021e-5426-43e3-b6bf-0e83c4ff4eda] received
[2021-08-10 03:10:32,349: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[c3bda008-6cda-436e-a454-0e9dc8900857] received
[2021-08-10 03:10:37,258: WARNING/MainProcess] lawyer_id=1 Continue...
[2021-08-10 03:10:37,258: WARNING/MainProcess] 

[2021-08-10 03:10:37,258: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[604585ed-76aa-44b6-9e9c-2119f0e33da2] succeeded in 0.01066518100014946s: None
[2021-08-10 03:10:42,347: WARNING/MainProcess] lawyer_id=2 Continue...
[2021-08-10 03:10:42,347: WARNING/MainProcess] 

[2021-08-10 03:10:42,347: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[6305c7d1-c5ff-4c40-915c-6e37ecf9c440] succeeded in 0.0018151690001104726s: None
[2021-08-10 03:10:47,350: WARNING/MainProcess] lawyer_id=3 Continue...
[2021-08-10 03:10:47,350: WARNING/MainProcess] 

[2021-08-10 03:10:47,350: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[3437dc9d-e3a5-4da3-8020-be52cf871aa1] succeeded in 0.004217886999867915s: None
[2021-08-10 03:10:52,340: WARNING/MainProcess] lawyer_id=4 Break now!
[2021-08-10 03:10:52,340: WARNING/MainProcess] 

[2021-08-10 03:10:52,341: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[682a21e5-0531-443b-b829-17c40b9c6509] succeeded in 0.013524283999686304s: None
[2021-08-10 03:10:57,330: WARNING/MainProcess] lawyer_id=5 Break...
[2021-08-10 03:10:57,330: WARNING/MainProcess] 

[2021-08-10 03:10:57,330: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[0d7dbff6-0207-441e-a3e3-2840f028b393] succeeded in 0.004878013000052306s: None
[2021-08-10 03:11:02,338: WARNING/MainProcess] lawyer_id=6 Break...
[2021-08-10 03:11:02,338: WARNING/MainProcess] 

[2021-08-10 03:11:02,338: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[e4e7987f-1ebe-4fc6-b37d-a475783fa1b5] succeeded in 0.00420587299959152s: None
[2021-08-10 03:11:07,336: WARNING/MainProcess] lawyer_id=7 Break...
[2021-08-10 03:11:07,336: WARNING/MainProcess] 

[2021-08-10 03:11:07,337: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[a9dc6b82-c507-47f0-9bab-073d33e24a19] succeeded in 0.005373962000703614s: None
[2021-08-10 03:11:12,327: WARNING/MainProcess] lawyer_id=8 Break...
[2021-08-10 03:11:12,328: WARNING/MainProcess] 

[2021-08-10 03:11:12,328: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[ac8b57cd-2c15-4978-9a70-69d9f876d776] succeeded in 0.004143920999922557s: None
[2021-08-10 03:11:17,342: WARNING/MainProcess] lawyer_id=9 Break...
[2021-08-10 03:11:17,342: WARNING/MainProcess] 

[2021-08-10 03:11:17,343: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[97fc021e-5426-43e3-b6bf-0e83c4ff4eda] succeeded in 0.004410948999975517s: None
[2021-08-10 03:11:22,327: WARNING/MainProcess] lawyer_id=10 Break...
[2021-08-10 03:11:22,327: WARNING/MainProcess] 

[2021-08-10 03:11:22,331: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[c3bda008-6cda-436e-a454-0e9dc8900857] succeeded in 0.007513158000620024s: None
[2021-08-10 03:11:22,332: INFO/MainProcess] Task task.lawyer.reset_cache[d6222a10-dbda-4764-add1-88e6ed93927a] received
[2021-08-10 03:11:22,338: INFO/MainProcess] Task task.lawyer.reset_cache[d6222a10-dbda-4764-add1-88e6ed93927a] succeeded in 0.004719080000540998s: None

References:

Solution 3: Run the tasks synchronously (Not advisable)

Design summary:

Update change_offered_lawyer to return an indicator to whether continue or break. Then after calling apply_async(), call get() to receive the result synchronously and break if needed.

Upvotes: 1

Related Questions