Reputation: 23
im looking to execute multiple related celery tasks within another celery task. Is this even possible?
Code below will give a better idea of what i am trying to accomplish
@shared_task(ignore_result=True)
def test_job():
try:
another_test_job.delay()
yet_another_test_job.delay()
except Exception as e:
logger.error(f'Something happened when collecting edr data ===> {e}')
raise
@shared_task(ignore_result=True)
def another_test_job():
try:
print('this is another test')
except Exception as e:
logger.error(f'Something happened when collecting edr data ===> {e}')
raise
@shared_task(ignore_result=True)
def yet_another_test_job():
try:
print('this is yet another test')
except Exception as e:
logger.error(f'Something happened when collecting edr data ===> {e}')
raise
My goal is to try to consolidate the amount of tasks i schedule essentially assigning my tasks under one task.
I have tried executing my task manually using test_job.apply()
results in
In [2]: test_job.apply()
2021-09-30 13:57:49,196 amqp DEBUG Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'someclustername', 'copyright': 'Copyright (c) 2007-2020 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 22.3.4.11', 'product': 'RabbitMQ', 'version': '3.8.9'}, mechanisms: [b'PLAIN'], locales: ['en_US']
2021-09-30 13:57:49,259 amqp DEBUG using channel_id: 1
2021-09-30 13:57:49,307 amqp DEBUG Channel open
2021-09-30 13:57:49,431 celery.app.trace INFO Task myapp.tasks.jobs.test_job[3d0c3a93-157c-46bf-b592-ebce0850c49e] succeeded in 0.4412529049999989s: None
Out[2]: <EagerResult: 3d0c3a93-157c-46bf-b592-ebce0850c49e>
if I test individual jobs directly it works just fine... another_test_job.apply()
In [3]: another_test_job.apply()
this is another test
2021-09-30 13:59:38,601 celery.app.trace INFO Task myapp.tasks.jobs.another_test_job[6499aaff-bc88-4df1-9f77-78ff68f29915] succeeded in 0.00015368999999054722s: None
Out[3]: <EagerResult: 6499aaff-bc88-4df1-9f77-78ff68f29915>
Can someone explain to me what is happening?
Upvotes: 2
Views: 2155
Reputation: 493
Unfortunately, it doesn't work this way. You need to create a Celery Group and trigger the group execution. To be able to "tie" all the executions, pass variables to the tasks when adding them to the Group, so you can make all of them report to a central resource, using this reference. Then check the central resource to see if all tasks in the Group have already been reported.
I will paste here a code I've been using to manage Celery Group tasks. You do need a central resource (like a database for example) and make your tasks report to it, so you can go there and check the progress. But the code I'm sharing can help you to understand how to manage a Celery Group. I use them so I do not handle Celery technicalities when coding.
The Django dependencies you can remove and swap for your project needs.
import importlib
from celery import group
from celery.result import AsyncResult
from django.core.exceptions import ValidationError
from django.conf import settings
class CeleryEntity:
def __init__(self, uuid=None):
self.uuid = uuid
self._celery_app = None
self._result = None
self._initialize()
def __str__(self):
return self.uuid
def _initialize(self, celery_app=None):
if not celery_app:
celery_app_reference = getattr(settings, 'CELERY_TOOLS_APP', None)
celery_app = getattr(importlib.import_module(celery_app_reference[0]), celery_app_reference[1])
self._celery_app = celery_app
def _validate_celery_app(self):
self._initialize()
if not self._celery_app:
raise ValidationError(
'No Celery App Object. Check your CELERY_TOOLS_APP on settings.py')
class CeleryTaskGroup(CeleryEntity):
def __init__(self, uuid=None, save=False):
super().__init__(uuid)
self.save = save
self._tasks = []
self._result_set = None
def __str__(self):
return self.uuid
def _initialize(self, celery_app=None):
if not celery_app:
celery_app_reference = getattr(settings, 'CELERY_TOOLS_APP', None)
celery_app = getattr(importlib.import_module(celery_app_reference[0]), celery_app_reference[1])
self._celery_app = celery_app
def _validate_tasks(self):
if not self._tasks:
return ValidationError('No Tasks defined. Add tasks using append_task method')
def _get_result_set(self):
if not self._result_set:
self._validate_celery_app()
results = self._celery_app.backend.restore_group(str(self.uuid))
self._result_set = results
return self._result_set
def length(self):
return len(self._tasks)
def stats(self):
""" Process statistics dictionary """
results = self._get_result_set()
if results:
ready = results.ready()
successful = results.successful()
completed = results.completed_count()
else:
ready = False
successful = False
completed = 0
stats = {
'ready': ready,
'successful': successful,
'completed': completed,
}
return stats
def append_task(self, task_name, params=()):
self._tasks.append(self._celery_app.signature(task_name, params))
def run_now(self, queue=None, wait=False):
self._validate_celery_app()
self._validate_tasks()
job = group(self._tasks)
group_results = job.apply_async(queue=queue)
if self.save:
group_results.save()
self.uuid = group_results.id
if wait:
group_results.join()
return group_results
def revoke_process(self):
grp = self._get_group()
if grp:
grp.revoke(terminate=True)
self._delete_group()
def _get_group(self):
self._validate_celery_app()
grp = self._celery_app.backend.restore_group(str(self.uuid))
return self._celery_app.GroupResult(id=str(self.uuid), results=grp) if grp else None
def _delete_group(self):
self._validate_celery_app()
self._celery_app.backend.delete_group(str(self.uuid))
class CeleryTask(CeleryEntity):
def _get_result(self):
if not self._result:
self._validate_celery_app()
self._result = AsyncResult(str(self.uuid), app=self._celery_app)
return self._result
def status(self):
result = self._get_result()
workers = self._celery_app.control.inspect().registered()
if result.id == 'None' or not workers:
return 'NO_TASK'
return result.state
def revoke(self):
result = self._get_result()
result.revoke(terminate=True)
def run(self, task_name, params, queue=None):
result = self._celery_app.signature(task_name, params).apply_async(queue=queue)
self.uuid = result.id
return result
Upvotes: 1