zmbq
zmbq

Reputation: 39013

Celery and RabbitMQ timeouts and connection resets

I'm using RabbitMQ 3.6.0 and Celery 3.1.20 on a Windows 10 machine in a Django application. Everything is running on the same computer. I've configured Celery to Acknowledge Late (CELERY_ACKS_LATE=True) and now I'm getting connection problems.

I start the Celery worker, and after 50-60 seconds of handling tasks each worker thread fails with the following message:

Couldn't ack ###, reason:ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)

(### is the number of the task)

When I look at the RabbitMQ logs I see this:

=INFO REPORT==== 10-Feb-2016::22:16:16 === accepting AMQP connection <0.247.0> (127.0.0.1:55372 -> 127.0.0.1:5672)

=INFO REPORT==== 10-Feb-2016::22:16:16 === accepting AMQP connection <0.254.0> (127.0.0.1:55373 -> 127.0.0.1:5672)

=ERROR REPORT==== 10-Feb-2016::22:17:14 === closing AMQP connection <0.247.0> (127.0.0.1:55372 -> 127.0.0.1:5672): {writer,send_failed,{error,timeout}}

The error occurs exactly when the Celery workers are getting their connection reset.

I thought this was an AMQP Heartbeat issue, so I've added BROKER_HEARTBEAT = 15 to my Celery settings, but it did not make any difference.

Upvotes: 7

Views: 10258

Answers (2)

Math is Hard
Math is Hard

Reputation: 934

@bbaker solution with CELERY_ACKS_LATE (which is task_acks_late in celery 4x) itself did not work for me. My workers are in Kubernetes pods and must be run with --pool solo and each task takes 30-60s.

I solved it by including broker_heartbeat=0

broker_pool_limit = None
task_acks_late = True
broker_heartbeat = 0
worker_prefetch_multiplier = 1

Upvotes: 2

bbaker
bbaker

Reputation: 408

I was having a similar issue with Celery on Windows with long running tasks with concurrency=1. The following configuration finally worked for me:

CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1

I also started the celery worker daemon with the -Ofair option:

celery -A test worker -l info -Ofair

In my limited understanding, CELERYD_PREFETCH_MULTIPLIER sets the number of messages that sit in the queue of a specific Celery worker. By default it is set to 4. If you set it to 1, each worker will only consume one message and complete the task before it consumes another message. I was having issues with long-running task because the connection to RabbitMQ was consistently lost in the middle of the long task, but then the task was re-attempted if any other message/tasks were waiting in the celery queue.

The following option was also specific to my situation:

CELERYD_CONCURRENCY = 1

Setting concurrency to 1 made sense for me because I had long running tasks that needed a large amount of RAM so they each needed to run solo.

Upvotes: 7

Related Questions