Reputation: 7069
I'm using celery with such configuration
default_exchange = Exchange('default', type='direct')
latest_exchange = Exchange('latest', type='direct')
shared_celery_config = {
'BROKER_URL': 'redis://localhost:6379/0',
'CELERY_RESULT_BACKEND': 'redis://localhost:6379/0',
'CELERY_DEFAULT_EXCHANGE': 'default',
'CELERY_DEFAULT_ROUTING_KEY': 'default',
'CELERY_DEFAULT_QUEUE': 'default',
'CELERY_QUEUES': (
Queue('default', default_exchange, routing_key='default'),
Queue('latest', latest_exchange, routing_key='latest'),
),
}
celery = Celery('tasks', config_source=shared_celery_config)
But when I'm creating a task none of the worker is consuming it and nothing happens. I start workers with:
celery worker -A tasks --loglevel=debug --hostname=worker1
. I can see them from ps aux | grep celery
output, but when executing some command to get statistics like celery -A tasks status
I get following error message
Error: No nodes replied within time constraint.
. Therefore all tasks are in PENDING status. I believe this is some misconfiguration, but can't figure out what's wrong and how to debug such a thing. Any advise would be very helpful
Upvotes: 1
Views: 890
Reputation: 8032
If you're using queues, make sure that at least 1 of your dags is assigned to the worker, meaning - the AIRFLOW__OPERATORS__DEFAULT_QUEUE
value of the worker's docker-compose.yaml
file should contain the default_args['queue']
value of the dag's python file.
Upvotes: 0
Reputation: 7069
The issue was resolved. I have an web application which uses gevent and in tasks
module I have import from another module that has monkey.patch_all()
. Somehow it prevents billiard and therefore a worker to start pool and that leads to this consequences.
Anyway, do not use gevent at all, imho.
Upvotes: 0