Reputation: 371
So, I'm creating a monitor application that will send the log's from celery tasks to an ELK stack.
So far, I have done this:
from project.celery import app
def monitor(app):
state = app.events.State()
def on_event_success(event):
state.event(event)
task = state.tasks.get(event['uuid'])
if task.name:
task_name = task.name
task_origin = task.origin
task_type = task.type
task_worker = task.worker
task_info = task.info()
task_log = "TASK NAME {}, TASK ORIGIN {}, TASK TYPE {}, TASK WORKER {}, TASK ARGS {}\n\n".format(task_name, task_origin, task_type, task_worker, task_info['args'])
print "SUCCESS: {}".format(task_log)
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-succeeded': on_event_success
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
application = app
monitor(app)
With this code I'm able to capture almost every information available in the task, but I didn't manage to find a way to capture which queue generated the task execution.
I have two queues:
CELERY_QUEUES = (
# Persistent task queue
Queue('celery', routing_key='celery'),
# Persistent routine task queue
Queue('routine', routing_key='routine')
)
I want to know which queue originated my task execution, taking this information from the task object that was created from the event.
Upvotes: 3
Views: 1192
Reputation: 19787
In order to do this you need to enable the task sent event.
You also need to implement a handler for the task-sent
event, just like you did with task-succeeded
.
Your monitoring application should keep at least task-id (the event["uuid"]
) and routing-key (event["routing_key"]
from all captured task-sent events. I do this using the TTLCache from cachetools and I use this dictionary from my task-succeeded and task-failed event handlers when I need routing-key information.
If you want task name and arguments for an example, you need to handle the task-received
event the same way I described above...
You may wonder why I use TTLCache - our Celery cluster runs few million tasks per day, keeping all task-sent event data in memory would soon take all available memory.
Finally, here is the code that caches the task-sent data and uses it in the task-succeeded event handler:
from cachetools import TTLCache
from project.celery import app
def monitor(app):
state = app.events.State()
# keep a couple of days of history in case not acknowledged tasks are retried
task_info = TTLCache(float('inf'), 3.2 * 24 * 60 * 60)
def on_event_success(event):
nonlocal task_info
state.event(event)
task = state.tasks.get(event['uuid'])
if task.name:
task_name = task.name
task_origin = task.origin
task_type = task.type
task_worker = task.worker
t_info = task.info()
task_log = "TASK NAME {}, TASK ORIGIN {}, TASK TYPE {}, TASK WORKER {}, TASK ARGS {}".format(task_name, task_$
print("SUCCESS: {}".format(task_log))
if event["uuid"] in task_info:
cached_task = task_info[event["uuid"]]
if "routing_key" in cached_task:
print(" routing_key: {}\n\n".format(cached_task["routing_key"]))
def on_task_sent(event):
# task-sent(uuid, name, args, kwargs, retries, eta, expires, queue, exchange,
# routing_key, root_id, parent_id)
nonlocal task_info
if event["uuid"] not in task_info:
task_info[event["uuid"]] = {"name": event["name"],
"args": event["args"],
"queue": event["queue"],
"routing_key": event["routing_key"]}
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-succeeded': on_event_success,
"task-sent": on_task_sent,
"*": state.event
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
application = app
monitor(app)
I never had enough time to investigate Celery's celery.events.state.State class. I do know it uses LRUCache to cache some entries, but I am not sure whether it can be used instead of the TTLCache I use in the code...
Upvotes: 2