Reputation: 715
I am working with Flask and remote celery worker, for celery communication I am using rabbitmq as message broker. The remote celery worker randomly throws error as follows:-
[2020-09-03 13:49:59,390: CRITICAL/MainProcess] Couldn't ack 20, reason:RecoverableConnectionError(None, 'connection already closed', None, '')
Traceback (most recent call last):
File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\kombu\message.py", line 131, in ack_log_error
self.ack(multiple=multiple)
File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\kombu\message.py", line 126, in ack
self.channel.basic_ack(self.delivery_tag, multiple=multiple)
File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\amqp\channel.py", line 1394, in basic_ack
spec.Basic.Ack, argsig, (delivery_tag, multiple),
File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\amqp\abstract_channel.py", line 56, in send_method
raise RecoverableConnectionError('connection already closed')
amqp.exceptions.RecoverableConnectionError: connection already closed
I am using celery version 4. Any pointers on how to avoid this error would be helpful.
Upvotes: 3
Views: 3153
Reputation: 443
I had the same issue with celery version 4.4.6 and rabbitmq when running very long tasks. I then ran the same tasks with the below config change and now it works (I run the worker in solo mode). The important config seems to be the broker heartbeat: https://www.rabbitmq.com/heartbeats.html. This should disable the heartbeats and the connection should not be reset by a missed heartbeat.
CELERY_BROKER_HEARTBEAT = 0
Link to celery doc: https://docs.celeryproject.org/en/v4.4.6/userguide/configuration.html#std:setting-broker_heartbeat
Integration with Flask should work like this:
from flask import Flask
from celery import Celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] =
'amqp://myuser:mypassword@localhost:5672/myvhost'
app.config['CELERY_BROKER_HEARTBEAT'] = 0
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
I can recommend this blog here (source for code snippet): https://blog.miguelgrinberg.com/post/using-celery-with-flask
Upvotes: 6