Reputation: 117
I an using celery with django.inside a forloop , each turn a value sets in db for a relationship
lawyers=Lawyer.objects.filter(consultation_status=True)
for idx,lawyer in enumerate(lawyers):
if(consultation.lawyer):
break
change_offered_lawyer.apply_async((id,lawyer.id),countdown=idx*60)
each turn in the loop inside task i check the condition and my goal is if the condition terminated then break all those tasks.
@app.task
def change_offered_lawyer(consulation_id,consulator_id):
consulation=ConsultationOrder.objects.get(id=consulation_id)
consulator=Lawyer.objects.get(id=consulator_id)
if(consultation.lawyer):
#break all tasks
consultation.offered_lawyer=consulator
consultation.save()
Upvotes: 1
Views: 964
Reputation: 10699
For a simpler explanation below, we would just use a list of numbers 1-10, which should break after the 4th number, thus processing 1-3 and skipping 4-10.
Design summary:
Link each task next to each other. After one task returns, the next one is called, and so on. A task must return a value that will indicate if the next task should continue or break.
Producer:
from celery import chain
from task import change_offered_lawyer
tasks = []
lawyers_count = 10
for lawyer_id in range(1, lawyers_count + 1):
if lawyer_id == 1:
# If first item, manually set the initial value for should_continue
tasks.append(change_offered_lawyer.s(True, lawyer_id))
else:
tasks.append(change_offered_lawyer.s(lawyer_id))
chain(*tasks).apply_async()
Consumer:
from celery import shared_task
@shared_task
def change_offered_lawyer(should_continue, lawyer_id):
if not should_continue:
print(f"{lawyer_id=} Break...")
return False
if lawyer_id == 4:
print(f"{lawyer_id=} Break now!")
return False
print(f"{lawyer_id=} Continue...")
return True
Logs:
[2021-08-10 03:04:03,294: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b01490c6-59ee-473e-b15d-40d4f628444c] received
[2021-08-10 03:04:03,296: WARNING/MainProcess] lawyer_id=1 Continue...
[2021-08-10 03:04:03,296: WARNING/MainProcess]
[2021-08-10 03:04:03,299: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b01490c6-59ee-473e-b15d-40d4f628444c] succeeded in 0.003656674999547249s: True
[2021-08-10 03:04:03,300: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[3ea4e578-4f56-491b-a774-b0a95b33c123] received
[2021-08-10 03:04:03,301: WARNING/MainProcess] lawyer_id=2 Continue...
[2021-08-10 03:04:03,301: WARNING/MainProcess]
[2021-08-10 03:04:03,302: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[3ea4e578-4f56-491b-a774-b0a95b33c123] succeeded in 0.001406519999363809s: True
[2021-08-10 03:04:03,303: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[dca891ef-4bac-4fa0-85e3-9d7ddfe031d5] received
[2021-08-10 03:04:03,305: WARNING/MainProcess] lawyer_id=3 Continue...
[2021-08-10 03:04:03,305: WARNING/MainProcess]
[2021-08-10 03:04:03,307: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[dca891ef-4bac-4fa0-85e3-9d7ddfe031d5] succeeded in 0.0023091260000001057s: True
[2021-08-10 03:04:03,308: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[ea57df15-88e7-4b65-8fc3-79df0a189652] received
[2021-08-10 03:04:03,313: WARNING/MainProcess] lawyer_id=4 Break now!
[2021-08-10 03:04:03,313: WARNING/MainProcess]
[2021-08-10 03:04:03,314: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[ea57df15-88e7-4b65-8fc3-79df0a189652] succeeded in 0.001986995999686769s: False
[2021-08-10 03:04:03,315: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[1e8fa660-2125-4790-b44e-4f884b826b40] received
[2021-08-10 03:04:03,318: WARNING/MainProcess] lawyer_id=5 Break...
[2021-08-10 03:04:03,318: WARNING/MainProcess]
[2021-08-10 03:04:03,324: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[1e8fa660-2125-4790-b44e-4f884b826b40] succeeded in 0.006545270000060555s: False
[2021-08-10 03:04:03,325: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b6825617-6162-4273-ad14-c128190e18be] received
[2021-08-10 03:04:03,327: WARNING/MainProcess] lawyer_id=6 Break...
[2021-08-10 03:04:03,327: WARNING/MainProcess]
[2021-08-10 03:04:03,328: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[b6825617-6162-4273-ad14-c128190e18be] succeeded in 0.002028814999903261s: False
[2021-08-10 03:04:03,329: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[5d8c1201-48c7-4238-8f92-b999adccfad9] received
[2021-08-10 03:04:03,330: WARNING/MainProcess] lawyer_id=7 Break...
[2021-08-10 03:04:03,330: WARNING/MainProcess]
[2021-08-10 03:04:03,331: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[5d8c1201-48c7-4238-8f92-b999adccfad9] succeeded in 0.0015183160003289231s: False
[2021-08-10 03:04:03,332: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[8aedf4ad-b3d6-44f6-b5e8-8227bc02df8a] received
[2021-08-10 03:04:03,333: WARNING/MainProcess] lawyer_id=8 Break...
[2021-08-10 03:04:03,333: WARNING/MainProcess]
[2021-08-10 03:04:03,335: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[8aedf4ad-b3d6-44f6-b5e8-8227bc02df8a] succeeded in 0.0019754629993258277s: False
[2021-08-10 03:04:03,336: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[28dfedc7-40d6-45ab-8d3b-972db47e99bf] received
[2021-08-10 03:04:03,337: WARNING/MainProcess] lawyer_id=9 Break...
[2021-08-10 03:04:03,337: WARNING/MainProcess]
[2021-08-10 03:04:03,338: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[28dfedc7-40d6-45ab-8d3b-972db47e99bf] succeeded in 0.0016348939998351852s: False
[2021-08-10 03:04:03,339: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[288f6c3d-77d7-4161-9b77-45bcd815f14e] received
[2021-08-10 03:04:03,341: WARNING/MainProcess] lawyer_id=10 Break...
[2021-08-10 03:04:03,341: WARNING/MainProcess]
[2021-08-10 03:04:03,341: INFO/MainProcess] Task task.lawyer.change_offered_lawyer_2[288f6c3d-77d7-4161-9b77-45bcd815f14e] succeeded in 0.00046298599954752717s: False
References:
Design summary:
Use a database storage that will track whether the execution must continue or break. Here, we would use django-caching system for ease of use. Note that if you would ever intend to run the tasks in parallel without the countdown
, this solution could be effective against race conditions.
settings.py :
...
CACHES = {
"default": {
"BACKEND": "django.core.cache.backends.db.DatabaseCache",
"LOCATION": "my_cache_table",
}
}
...
python manage.py createcachetable
Producer:
from django.core.cache import cache
from task import change_offered_lawyer, reset_cache
cache.set(key='change_offered_lawyer', value='continue', timeout=None)
lawyers_count = 10
for lawyer_id in range(1, lawyers_count + 1):
if lawyer_id == lawyers_count:
# If last item, reset the cache
change_offered_lawyer.apply_async((lawyer_id,), countdown=lawyer_id * 5, link=reset_cache.si())
else:
change_offered_lawyer.apply_async((lawyer_id,), countdown=lawyer_id * 5)
Consumer:
from celery import shared_task
from django.core.cache import cache
@shared_task
def change_offered_lawyer(lawyer_id):
if cache.get('change_offered_lawyer') == 'break':
print(f"{lawyer_id=} Break...")
return
if lawyer_id == 4:
cache.set(key='change_offered_lawyer', value='break', timeout=None)
print(f"{lawyer_id=} Break now!")
return
print(f"{lawyer_id=} Continue...")
@shared_task
def reset_cache():
cache.delete('change_offered_lawyer')
Logs:
[2021-08-10 03:10:32,333: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[604585ed-76aa-44b6-9e9c-2119f0e33da2] received
[2021-08-10 03:10:32,335: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[6305c7d1-c5ff-4c40-915c-6e37ecf9c440] received
[2021-08-10 03:10:32,337: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[3437dc9d-e3a5-4da3-8020-be52cf871aa1] received
[2021-08-10 03:10:32,343: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[682a21e5-0531-443b-b829-17c40b9c6509] received
[2021-08-10 03:10:32,344: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[0d7dbff6-0207-441e-a3e3-2840f028b393] received
[2021-08-10 03:10:32,345: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[e4e7987f-1ebe-4fc6-b37d-a475783fa1b5] received
[2021-08-10 03:10:32,346: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[a9dc6b82-c507-47f0-9bab-073d33e24a19] received
[2021-08-10 03:10:32,347: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[ac8b57cd-2c15-4978-9a70-69d9f876d776] received
[2021-08-10 03:10:32,349: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[97fc021e-5426-43e3-b6bf-0e83c4ff4eda] received
[2021-08-10 03:10:32,349: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[c3bda008-6cda-436e-a454-0e9dc8900857] received
[2021-08-10 03:10:37,258: WARNING/MainProcess] lawyer_id=1 Continue...
[2021-08-10 03:10:37,258: WARNING/MainProcess]
[2021-08-10 03:10:37,258: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[604585ed-76aa-44b6-9e9c-2119f0e33da2] succeeded in 0.01066518100014946s: None
[2021-08-10 03:10:42,347: WARNING/MainProcess] lawyer_id=2 Continue...
[2021-08-10 03:10:42,347: WARNING/MainProcess]
[2021-08-10 03:10:42,347: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[6305c7d1-c5ff-4c40-915c-6e37ecf9c440] succeeded in 0.0018151690001104726s: None
[2021-08-10 03:10:47,350: WARNING/MainProcess] lawyer_id=3 Continue...
[2021-08-10 03:10:47,350: WARNING/MainProcess]
[2021-08-10 03:10:47,350: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[3437dc9d-e3a5-4da3-8020-be52cf871aa1] succeeded in 0.004217886999867915s: None
[2021-08-10 03:10:52,340: WARNING/MainProcess] lawyer_id=4 Break now!
[2021-08-10 03:10:52,340: WARNING/MainProcess]
[2021-08-10 03:10:52,341: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[682a21e5-0531-443b-b829-17c40b9c6509] succeeded in 0.013524283999686304s: None
[2021-08-10 03:10:57,330: WARNING/MainProcess] lawyer_id=5 Break...
[2021-08-10 03:10:57,330: WARNING/MainProcess]
[2021-08-10 03:10:57,330: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[0d7dbff6-0207-441e-a3e3-2840f028b393] succeeded in 0.004878013000052306s: None
[2021-08-10 03:11:02,338: WARNING/MainProcess] lawyer_id=6 Break...
[2021-08-10 03:11:02,338: WARNING/MainProcess]
[2021-08-10 03:11:02,338: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[e4e7987f-1ebe-4fc6-b37d-a475783fa1b5] succeeded in 0.00420587299959152s: None
[2021-08-10 03:11:07,336: WARNING/MainProcess] lawyer_id=7 Break...
[2021-08-10 03:11:07,336: WARNING/MainProcess]
[2021-08-10 03:11:07,337: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[a9dc6b82-c507-47f0-9bab-073d33e24a19] succeeded in 0.005373962000703614s: None
[2021-08-10 03:11:12,327: WARNING/MainProcess] lawyer_id=8 Break...
[2021-08-10 03:11:12,328: WARNING/MainProcess]
[2021-08-10 03:11:12,328: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[ac8b57cd-2c15-4978-9a70-69d9f876d776] succeeded in 0.004143920999922557s: None
[2021-08-10 03:11:17,342: WARNING/MainProcess] lawyer_id=9 Break...
[2021-08-10 03:11:17,342: WARNING/MainProcess]
[2021-08-10 03:11:17,343: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[97fc021e-5426-43e3-b6bf-0e83c4ff4eda] succeeded in 0.004410948999975517s: None
[2021-08-10 03:11:22,327: WARNING/MainProcess] lawyer_id=10 Break...
[2021-08-10 03:11:22,327: WARNING/MainProcess]
[2021-08-10 03:11:22,331: INFO/MainProcess] Task task.lawyer.change_offered_lawyer[c3bda008-6cda-436e-a454-0e9dc8900857] succeeded in 0.007513158000620024s: None
[2021-08-10 03:11:22,332: INFO/MainProcess] Task task.lawyer.reset_cache[d6222a10-dbda-4764-add1-88e6ed93927a] received
[2021-08-10 03:11:22,338: INFO/MainProcess] Task task.lawyer.reset_cache[d6222a10-dbda-4764-add1-88e6ed93927a] succeeded in 0.004719080000540998s: None
References:
Design summary:
Update change_offered_lawyer
to return an indicator to whether continue or break. Then after calling apply_async()
, call get()
to receive the result synchronously and break if needed.
Upvotes: 1