toasty89
toasty89

Reputation: 23

Execute multiple celery tasks inside a celery task

im looking to execute multiple related celery tasks within another celery task. Is this even possible?

Code below will give a better idea of what i am trying to accomplish

@shared_task(ignore_result=True)
def test_job():
    try:
        another_test_job.delay()
        yet_another_test_job.delay()

    except Exception as e:
        logger.error(f'Something happened when collecting edr data ===> {e}')
        raise


@shared_task(ignore_result=True)
def another_test_job():
    try:
        print('this is another test')

    except Exception as e:
        logger.error(f'Something happened when collecting edr data ===> {e}')
        raise


@shared_task(ignore_result=True)

def yet_another_test_job():
    try:
        print('this is yet another test')

    except Exception as e:
        logger.error(f'Something happened when collecting edr data ===> {e}')
        raise

My goal is to try to consolidate the amount of tasks i schedule essentially assigning my tasks under one task.

I have tried executing my task manually using test_job.apply()

results in

In [2]: test_job.apply()                                                                                                                                       
2021-09-30 13:57:49,196 amqp                                DEBUG    Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'someclustername', 'copyright': 'Copyright (c) 2007-2020 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 22.3.4.11', 'product': 'RabbitMQ', 'version': '3.8.9'}, mechanisms: [b'PLAIN'], locales: ['en_US']
2021-09-30 13:57:49,259 amqp                                DEBUG    using channel_id: 1
2021-09-30 13:57:49,307 amqp                                DEBUG    Channel open
2021-09-30 13:57:49,431 celery.app.trace                    INFO     Task myapp.tasks.jobs.test_job[3d0c3a93-157c-46bf-b592-ebce0850c49e] succeeded in 0.4412529049999989s: None
Out[2]: <EagerResult: 3d0c3a93-157c-46bf-b592-ebce0850c49e>

if I test individual jobs directly it works just fine... another_test_job.apply()

In [3]: another_test_job.apply()                                                                                                                                                            
this is another test
2021-09-30 13:59:38,601 celery.app.trace                    INFO     Task myapp.tasks.jobs.another_test_job[6499aaff-bc88-4df1-9f77-78ff68f29915] succeeded in 0.00015368999999054722s: None
Out[3]: <EagerResult: 6499aaff-bc88-4df1-9f77-78ff68f29915>

Can someone explain to me what is happening?

Upvotes: 2

Views: 2155

Answers (1)

Gustavo Gon&#231;alves
Gustavo Gon&#231;alves

Reputation: 493

Unfortunately, it doesn't work this way. You need to create a Celery Group and trigger the group execution. To be able to "tie" all the executions, pass variables to the tasks when adding them to the Group, so you can make all of them report to a central resource, using this reference. Then check the central resource to see if all tasks in the Group have already been reported.

I will paste here a code I've been using to manage Celery Group tasks. You do need a central resource (like a database for example) and make your tasks report to it, so you can go there and check the progress. But the code I'm sharing can help you to understand how to manage a Celery Group. I use them so I do not handle Celery technicalities when coding.

The Django dependencies you can remove and swap for your project needs.

import importlib

from celery import group
from celery.result import AsyncResult

from django.core.exceptions import ValidationError
from django.conf import settings


class CeleryEntity:

    def __init__(self, uuid=None):
        self.uuid = uuid
        self._celery_app = None
        self._result = None
        self._initialize()

    def __str__(self):
        return self.uuid

    def _initialize(self, celery_app=None):
        if not celery_app:
            celery_app_reference = getattr(settings, 'CELERY_TOOLS_APP', None)
            celery_app = getattr(importlib.import_module(celery_app_reference[0]), celery_app_reference[1])
        self._celery_app = celery_app

    def _validate_celery_app(self):
        self._initialize()
        if not self._celery_app:
            raise ValidationError(
                'No Celery App Object. Check your CELERY_TOOLS_APP on settings.py')

  

class CeleryTaskGroup(CeleryEntity):

    def __init__(self, uuid=None, save=False):
        super().__init__(uuid)
        self.save = save
        self._tasks = []
        self._result_set = None

    def __str__(self):
        return self.uuid

    def _initialize(self, celery_app=None):
        if not celery_app:
            celery_app_reference = getattr(settings, 'CELERY_TOOLS_APP', None)
            celery_app = getattr(importlib.import_module(celery_app_reference[0]), celery_app_reference[1])
        self._celery_app = celery_app

    def _validate_tasks(self):
        if not self._tasks:
            return ValidationError('No Tasks defined. Add tasks using append_task method')

    def _get_result_set(self):
        if not self._result_set:
            self._validate_celery_app()
            results = self._celery_app.backend.restore_group(str(self.uuid))
            self._result_set = results
        return self._result_set

    def length(self):
        return len(self._tasks)

    def stats(self):
        """ Process statistics dictionary """

        results = self._get_result_set()

        if results:
            ready = results.ready()
            successful = results.successful()
            completed = results.completed_count()
        else:
            ready = False
            successful = False
            completed = 0

        stats = {
            'ready': ready,
            'successful': successful,
            'completed': completed,
        }

        return stats

    def append_task(self, task_name, params=()):
        self._tasks.append(self._celery_app.signature(task_name, params))

    def run_now(self, queue=None, wait=False):

        self._validate_celery_app()
        self._validate_tasks()

        job = group(self._tasks)
        group_results = job.apply_async(queue=queue)
        if self.save:
            group_results.save()

        self.uuid = group_results.id

        if wait:
            group_results.join()

        return group_results

    def revoke_process(self):
        grp = self._get_group()
        if grp:
            grp.revoke(terminate=True)
        self._delete_group()

    def _get_group(self):
        self._validate_celery_app()
        grp = self._celery_app.backend.restore_group(str(self.uuid))
        return self._celery_app.GroupResult(id=str(self.uuid), results=grp) if grp else None

    def _delete_group(self):
        self._validate_celery_app()
        self._celery_app.backend.delete_group(str(self.uuid))


class CeleryTask(CeleryEntity):

    def _get_result(self):
        if not self._result:
            self._validate_celery_app()
            self._result = AsyncResult(str(self.uuid), app=self._celery_app)
        return self._result

    def status(self):
        result = self._get_result()
        workers = self._celery_app.control.inspect().registered()
        if result.id == 'None' or not workers:
            return 'NO_TASK'
        return result.state

    def revoke(self):
        result = self._get_result()
        result.revoke(terminate=True)

    def run(self, task_name, params, queue=None):
        result = self._celery_app.signature(task_name, params).apply_async(queue=queue)
        self.uuid = result.id
        return result

Upvotes: 1

Related Questions