Jonathan DEKHTIAR
Jonathan DEKHTIAR

Reputation: 3536

RabbitMQ or Redis exploding Celery queues with Django 2.0

I am encountering a problem with celery and Django 2. I have two running environments:

The production environment should be migrated to Django 2.0 as soon as possible. However, I can't do it without fixing this issue with Celery. My development environment is here to insure that everything is running fine before upgrading production servers.

The question

  1. What changed with Django 2 to make a system, which was stable with Django 1.11, unstable with exploding queue sizes, with the same effect in RabbitMQ and Redis?

  2. If any task is not consumed, how could it be automatically deleted by Redis/RabbitMQ ?

Celery Worker is launched as followed

The exact same command is used for both environment.

celery beat -A application --loglevel=info --detach 
celery events -A application --loglevel=info --camera=django_celery_monitor.camera.Camera --frequency=2.0 --detach
celery worker -A application -l info --events

Application Settings

Since I migrate my development environment to Django 2, my RabbitMQ queues or Redis queues are litterally exploding in size and my database instances keep on scaling up. It seems like the tasks are not removed anymore from the queues.

I have to manually cleanup the celery queue which contains after a few days more 250k tasks. It seems that the TTL is set to "-1" however I can't figure out how to set it from django.

After a few hours, I have more than 220k tasks waiting to be processed and growing. image

I use the following settings: available in file settings.py

Warning: The names used might not be the correct ones for celery, a remap has been to correctly assign the values with the file celery.py

# Celery Configuration
broker_url = "borker_url" # Redis or RabbitMQ, it doesn't change anything.    
broker_use_ssl=True

accept_content = ['application/json']

worker_concurrency = 3

result_serializer = 'json'
result_expires=7*24*30*30

task_serializer = 'json'
task_acks_late=True # Acknoledge pool when task is over
task_reject_on_worker_lost=True
task_time_limit=90
task_soft_time_limit=60
task_always_eager = False
task_queues=[
    Queue(
        'celery',
        Exchange('celery'),
        routing_key = 'celery',
        queue_arguments = {
            'x-message-ttl': 60 * 1000 # 60 000 ms = 60 secs.
        }
    )
]

event_queue_expires=60
event_queue_ttl=5

beat_scheduler = 'django_celery_beat.schedulers:DatabaseScheduler'
beat_max_loop_interval=10
beat_sync_every=1

monitors_expire_success  = timedelta(hours=1)
monitors_expire_error  = timedelta(days=3)
monitors_expire_pending = timedelta(days=5)

beat_schedule = {
    'refresh_all_rss_subscribers_count': {
        'task': 'feedcrunch.tasks.refresh_all_rss_subscribers_count',
        'schedule': crontab(hour=0, minute=5), # Everyday at midnight + 5 mins
        'options': {'expires': 20 * 60} # 20 minutes
    },
    'clean_unnecessary_rss_visits': {
        'task': 'feedcrunch.tasks.clean_unnecessary_rss_visits',
        'schedule': crontab(hour=0, minute=20), # Everyday at midnight + 20 mins
        'options': {'expires': 20 * 60} # 20 minutes
    },
    'celery.backend_cleanup': {
        'task': 'celery.backend_cleanup',
        'schedule': crontab(minute='30'), # Every hours when minutes = 30 mins
        'options': {'expires': 50 * 60} # 50 minutes
    },
    'refresh_all_rss_feeds': {
        'task': 'feedcrunch.tasks.refresh_all_rss_feeds',
        'schedule': crontab(minute='40'), # Every hours when minutes = 40 mins
        'options': {'expires': 30 * 60} # 30 minutes
    },
}

Worker Logs examples

Worker LOgs

Some idea : Is it normal that "expires" and "timelimit" settings are set to None (see image above).

Upvotes: 4

Views: 2727

Answers (1)

Jonathan DEKHTIAR
Jonathan DEKHTIAR

Reputation: 3536

I think I have found a temporary solution, it seems that the celery library have some bugs. We have to wait for the 4.2.0 release.

I recommend having a look to: https://github.com/celery/celery/issues/#4041.

As a temporary bug fix, I recommend using the following commit: https://github.com/celery/celery/commit/be55de6, it seems to have fixed the issue:

git+https://github.com/celery/celery.git@be55de6#egg=celery

Upvotes: 1

Related Questions