sattva_venu
sattva_venu

Reputation: 715

celery worker - connection already closed error

I am working with Flask and remote celery worker, for celery communication I am using rabbitmq as message broker. The remote celery worker randomly throws error as follows:-

[2020-09-03 13:49:59,390: CRITICAL/MainProcess] Couldn't ack 20, reason:RecoverableConnectionError(None, 'connection already closed', None, '')
Traceback (most recent call last):
  File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\kombu\message.py", line 131, in ack_log_error
    self.ack(multiple=multiple)
  File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\kombu\message.py", line 126, in ack
    self.channel.basic_ack(self.delivery_tag, multiple=multiple)
  File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\amqp\channel.py", line 1394, in basic_ack
    spec.Basic.Ack, argsig, (delivery_tag, multiple),
  File "c:\users\g-us01.test\.virtualenvs\celery_doors_integration-o4w-mxzx\lib\site-packages\amqp\abstract_channel.py", line 56, in send_method
    raise RecoverableConnectionError('connection already closed')
amqp.exceptions.RecoverableConnectionError: connection already closed

I am using celery version 4. Any pointers on how to avoid this error would be helpful.

Upvotes: 3

Views: 3153

Answers (1)

sidi7
sidi7

Reputation: 443

I had the same issue with celery version 4.4.6 and rabbitmq when running very long tasks. I then ran the same tasks with the below config change and now it works (I run the worker in solo mode). The important config seems to be the broker heartbeat: https://www.rabbitmq.com/heartbeats.html. This should disable the heartbeats and the connection should not be reset by a missed heartbeat.

CELERY_BROKER_HEARTBEAT = 0

Link to celery doc: https://docs.celeryproject.org/en/v4.4.6/userguide/configuration.html#std:setting-broker_heartbeat

Integration with Flask should work like this:

from flask import Flask
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] =
                            'amqp://myuser:mypassword@localhost:5672/myvhost'
app.config['CELERY_BROKER_HEARTBEAT'] = 0

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

I can recommend this blog here (source for code snippet): https://blog.miguelgrinberg.com/post/using-celery-with-flask

Upvotes: 6

Related Questions