Igor Servulo
Igor Servulo

Reputation: 371

How to get the queue that originated a task execution from Celery

So, I'm creating a monitor application that will send the log's from celery tasks to an ELK stack.

So far, I have done this:

from project.celery import app


    def monitor(app):
        state = app.events.State()

        def on_event_success(event):
            state.event(event)

            task = state.tasks.get(event['uuid'])
            if task.name:
                    task_name = task.name
                    task_origin = task.origin
                    task_type = task.type
                    task_worker = task.worker
                    task_info = task.info()
                    task_log = "TASK NAME {}, TASK ORIGIN {}, TASK TYPE {}, TASK WORKER {}, TASK ARGS {}\n\n".format(task_name, task_origin, task_type, task_worker, task_info['args'])
                    print "SUCCESS: {}".format(task_log)

        with app.connection() as connection:
            recv = app.events.Receiver(connection, handlers={
                    'task-succeeded': on_event_success
            })
            recv.capture(limit=None, timeout=None, wakeup=True)

    if __name__ == '__main__':
        application = app
        monitor(app)

With this code I'm able to capture almost every information available in the task, but I didn't manage to find a way to capture which queue generated the task execution.

I have two queues:

CELERY_QUEUES = (
    # Persistent task queue
    Queue('celery', routing_key='celery'),
    # Persistent routine task queue
    Queue('routine', routing_key='routine')
)

I want to know which queue originated my task execution, taking this information from the task object that was created from the event.

Upvotes: 3

Views: 1192

Answers (1)

DejanLekic
DejanLekic

Reputation: 19787

In order to do this you need to enable the task sent event.

You also need to implement a handler for the task-sent event, just like you did with task-succeeded.

Your monitoring application should keep at least task-id (the event["uuid"]) and routing-key (event["routing_key"] from all captured task-sent events. I do this using the TTLCache from cachetools and I use this dictionary from my task-succeeded and task-failed event handlers when I need routing-key information.

If you want task name and arguments for an example, you need to handle the task-received event the same way I described above...

You may wonder why I use TTLCache - our Celery cluster runs few million tasks per day, keeping all task-sent event data in memory would soon take all available memory.

Finally, here is the code that caches the task-sent data and uses it in the task-succeeded event handler:

from cachetools import TTLCache
from project.celery import app


def monitor(app):
    state = app.events.State()

    # keep a couple of days of history in case not acknowledged tasks are retried
    task_info = TTLCache(float('inf'), 3.2 * 24 * 60 * 60)

    def on_event_success(event):
        nonlocal task_info
        state.event(event)

        task = state.tasks.get(event['uuid'])
        if task.name:
                task_name = task.name
                task_origin = task.origin
                task_type = task.type
                task_worker = task.worker
                t_info = task.info()
                task_log = "TASK NAME {}, TASK ORIGIN {}, TASK TYPE {}, TASK WORKER {}, TASK ARGS {}".format(task_name, task_$
                print("SUCCESS: {}".format(task_log))
                if event["uuid"] in task_info:
                    cached_task = task_info[event["uuid"]]
                    if "routing_key" in cached_task:
                        print("    routing_key: {}\n\n".format(cached_task["routing_key"]))

    def on_task_sent(event):
        # task-sent(uuid, name, args, kwargs, retries, eta, expires, queue, exchange,
        # routing_key, root_id, parent_id)
        nonlocal task_info
        if event["uuid"] not in task_info:
            task_info[event["uuid"]] = {"name": event["name"],
                                        "args": event["args"],
                                        "queue": event["queue"],
                                        "routing_key": event["routing_key"]}

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-succeeded': on_event_success,
                "task-sent": on_task_sent,
                "*": state.event
        })
    recv.capture(limit=None, timeout=None, wakeup=True)


if __name__ == '__main__':
    application = app
    monitor(app)

I never had enough time to investigate Celery's celery.events.state.State class. I do know it uses LRUCache to cache some entries, but I am not sure whether it can be used instead of the TTLCache I use in the code...

Upvotes: 2

Related Questions