Sergey
Sergey

Reputation: 21191

How to cancel conflicting / old tasks in Celery?

I'm using Celery + RabbitMQ.
When a Celery worker isn't available all the tasks are waiting in RabbitMQ.
Just as it becomes online all this bunch of tasks is executed immediately.
Can I somehow prevent it happening?

For example there are 100 tasks (the same) waiting for a Celery worker, can I execute only 1 of them when a Celery worker comes online?

Upvotes: 0

Views: 2230

Answers (2)

Pierre
Pierre

Reputation: 13046

Since all the tasks are the same in your queue, A better way to do this is to send the task only once, to do this you need to be able to track that the task was published, for example:

To add a custom state when the task is published:

from celery import current_app
from celery.signals import after_task_publish

@after_task_publish.connect
def add_sent_state(sender=None, body=None, **kwargs):
    """Track Published Tasks."""
    # get the task instance from its name
    task = current_app.tasks.get(sender)

    # if there is no task.backend fallback to app.backend
    backend = task.backend if task else current_app.backend

    # store the task state
    backend.store_result(body['id'], None, 'SENT')

When you want to send the task you can check if the task has already been published, and since we're using a custom state the task's state won't be PENDING when it's published (which could be unkown) so we can check using:

from celery import states

# the task has a custom ID
task = task_func.AsyncResult('CUSTOM_ID')

if task.state != states.PENDING:
    # the task already exists
else:
    # send the task
    task_func.apply_async(args, kwargs, task_id='CUSTOM_ID')

I'm using this approach in my app and it's working great, my tasks could be sent multiple times and they are identified by their IDs so this way each task is sent once.

If you're still want to cancel all the tasks in the queue you can use:

# import your Celery instance
from project.celery import app

app.control.purge()

Check the Celery FAQ How do I purge all waiting tasks ?

Upvotes: 2

Chillar Anand
Chillar Anand

Reputation: 29514

There are two ways to do this.

First, Run only one worker with a concurrency of one.

celery worker -A your_app -l info -c 1

This command starts a worker with a concurrency of one. So only one task will be executed at a time. This is the preferred way to do it.

Second method is bit complicated. You need to acquire lock and release the lock to make sure only one task is executed at a time.

Alternatively, if you want, you can remove all the tasks from queue using purge command.

celery -A your_app purge

Upvotes: 1

Related Questions