Reputation: 10048
I'm creating Tasks at a rate of 5 Tasks per second. I can see in RabbitMQ Message rates incoming an average of 5.2/s, I have 240 consumers distributed in 4 Virtual machines (60 per VM), each worker process a Task that last 20 seconds. In theory I'm supposed to handle 100K task without queuing.
I see a large number of Unacked messages. How to get rid of Unacked messages or add a timer to kill them, does that point to be a problem in my worker side?
How can I recover unacknowledged AMQP messages from other channels than my connection's own?
Queues tab
Ready Unacked Total incoming deliver / get ack
21,884 960 22,844 5.0/s 0.40/s 0.40/s
Exchange tab: stackoverflow direct D 5.0/s 5.0/s
This is my celeryconfig file.
CELERYD_CHDIR = settings.filepath
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = "US/Eastern"
CELERY_ACCEPT_CONTENT = ['json', 'pickle', 'yaml']
CELERY_IGNORE_RESULT = True
CELERY_RESULT_BACKEND = "amqp"
CELERY_RESULT_PERSISTENT = True
BROKER_URL = 'amqp://stackoverflow:stackoverflow@rabbitmq:5672'
BROKER_CONNECTION_TIMEOUT = 15
BROKER_CONNECTION_MAX_RETRIES = 5
CELERY_DISABLE_RATE_LIMITS = True
CELERY_TASK_RESULT_EXPIRES = 7200
CELERY_IMPORTS = ("cc.modules.stackoverflow")
CELERY_DEFAULT_QUEUE = "default"
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('gold', Exchange('stackoverflow'), routing_key='stackoverflow.gold'),
Queue('silver', Exchange('stackoverflow'), routing_key='stackoverflow.silver'),
Queue('bronze', Exchange('stackoverflow'), routing_key='stackoverflow.bronze'),
)
CELERY_DEFAULT_EXCHANGE = "stackoverflow"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "default"
CELERY_TRACK_STARTED = True
CELERY_ROUTES = {
'process_call' : {'queue': 'gold', 'routing_key': 'stackoverflow.gold', 'exchange': 'stackoverflow',},
'process_recording': {'queue': 'silver', 'routing_key': 'stackoverflow.silver', 'exchange': 'stackoverflow',},
'process_campaign' : {'queue': 'bronze', 'routing_key': 'stackoverflow.bronze', 'exchange': 'stackoverflow',}
}
Upvotes: 0
Views: 5076
Reputation: 447
Message acknowledgement acts like transaction in SQL like commit you have to acknowledge the message received from RabbitMq. This function help to avoid the message loss in your system.
Navigate to Message acknowledgment title https://www.rabbitmq.com/tutorials/tutorial-two-python.html
Upvotes: 1