Reputation: 1800
I run to issue that Celery worker connection with Rabbitmq met broken pipe error IN Gevent Mode. While no problem when Celery worker work in Process pool mode (without gevent without monkey patch).
After that, Celery workers will not get task messages from Rabbitmq anymore until they are restarted.
That issue always happen when the speed of Celery workers consuming task messages slower than Django applications producing messages, and about 3 thounds of messages piled in Rabbitmq.
Gevent version 1.1.0
Celery version 3.1.22
====== Celery log ======
[2016-08-08 13:52:06,913: CRITICAL/MainProcess] Couldn't ack 293, reason:error(32, 'Broken pipe')
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 93, in ack_log_error
self.ack()
File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 88, in ack
self.channel.basic_ack(self.delivery_tag)
File "/usr/local/lib/python2.7/site-packages/amqp/channel.py", line 1584, in basic_ack
self._send_method((60, 80), args)
File "/usr/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, in _send_method
self.channel_id, method_sig, args, content,
File "/usr/local/lib/python2.7/site-packages/amqp/method_framing.py", line 221, in write_method
write_frame(1, channel, payload)
File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 182, in write_frame
frame_type, channel, size, payload, 0xce,
File "/usr/local/lib/python2.7/site-packages/gevent/_socket2.py", line 412, in sendall
timeleft = self.__send_chunk(chunk, flags, timeleft, end)
File "/usr/local/lib/python2.7/site-packages/gevent/_socket2.py", line 351, in __send_chunk
data_sent += self.send(chunk, flags)
File "/usr/local/lib/python2.7/site-packages/gevent/_socket2.py", line 320, in send
return sock.send(data, flags)
error: [Errno 32] Broken pipe
======= Rabbitmq log ==================
=ERROR REPORT==== 8-Aug-2016::14:28:33 ===
closing AMQP connection <0.15928.4> (10.26.39.183:60732 -> 10.26.39.183:5672):
{writer,send_failed,{error,enotconn}}
=ERROR REPORT==== 8-Aug-2016::14:29:03 ===
closing AMQP connection <0.15981.4> (10.26.39.183:60736 -> 10.26.39.183:5672):
{writer,send_failed,{error,enotconn}}
=ERROR REPORT==== 8-Aug-2016::14:29:03 ===
closing AMQP connection <0.15955.4> (10.26.39.183:60734 -> 10.26.39.183:5672):
{writer,send_failed,{error,enotconn}}
The similar issue appears when Celery worker use eventlet.
[2016-08-09 17:41:37,952: CRITICAL/MainProcess] Couldn't ack 583, reason:error(32, 'Broken pipe')
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 93, in ack_log_error
self.ack()
File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 88, in ack
self.channel.basic_ack(self.delivery_tag)
File "/usr/local/lib/python2.7/site-packages/amqp/channel.py", line 1584, in basic_ack
self._send_method((60, 80), args)
File "/usr/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, in _send_method
self.channel_id, method_sig, args, content,
File "/usr/local/lib/python2.7/site-packages/amqp/method_framing.py", line 221, in write_method
write_frame(1, channel, payload)
File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 182, in write_frame
frame_type, channel, size, payload, 0xce,
File "/usr/local/lib/python2.7/site-packages/eventlet/greenio/base.py", line 385, in sendall
tail = self.send(data, flags)
File "/usr/local/lib/python2.7/site-packages/eventlet/greenio/base.py", line 379, in send
return self._send_loop(self.fd.send, data, flags)
File "/usr/local/lib/python2.7/site-packages/eventlet/greenio/base.py", line 366, in _send_loop
return send_method(data, *args)
error: [Errno 32] Broken pipe
Add setup and load test info
We use supervisor to launch celery with the following options
celery worker -A celerytasks.celery_worker_init -Q default -P gevent -c 1000 --loglevel=info
And Celery use Rabbitmq as broker.
And we have 4 Celery worker processes by specifying "numprocs=4" in supervisor configurations.
We use jmeter to emulate web access load, Django applications will produces tasks for Celery workers to consume. Those tasks basically need to access Mysql DB to get/update some data.
From rabbitmq web admin page, tasks-producing speed is like 50 per seconds while consuming speed is like 20 per seconds. After about 1 miniutes load testing, log file shows many connections between Rabbitmq and Celery met Broken-Pipe error
Upvotes: 6
Views: 5346
Reputation: 1275
We noticed that this issue is also caused because of a combination of high prefect count along with high concurrency.
We had concurrency set to 500
and prefetch to 100
, which means the ultimate prefetch is 500*100=50,000
per worker.
We had around 100k tasks piled up and because of this configuration one worker reserved all tasks for itself and other workers weren't even used, this one worker kept getting Broken pipe
error and never acknowledge any task which lead to tasks being never cleared from the queue.
We then changed the prefetch to 3
and restarted all the workers which fixed the issue, after changing the prefetch down to a lower number we have seen 0 instances of Broken pipe error since we used to see it quite often before that.
Upvotes: 2