Jcyrss
Jcyrss

Reputation: 1800

Celery worker connection with Rabbitmq met broken pipe error In gevent or eventlet Mode

I run to issue that Celery worker connection with Rabbitmq met broken pipe error IN Gevent Mode. While no problem when Celery worker work in Process pool mode (without gevent without monkey patch).

After that, Celery workers will not get task messages from Rabbitmq anymore until they are restarted.

That issue always happen when the speed of Celery workers consuming task messages slower than Django applications producing messages, and about 3 thounds of messages piled in Rabbitmq.

Gevent version 1.1.0

Celery version 3.1.22

====== Celery log ======

[2016-08-08 13:52:06,913: CRITICAL/MainProcess] Couldn't ack 293, reason:error(32, 'Broken pipe')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 93, in ack_log_error
    self.ack()
  File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 88, in ack
    self.channel.basic_ack(self.delivery_tag)
  File "/usr/local/lib/python2.7/site-packages/amqp/channel.py", line 1584, in basic_ack
    self._send_method((60, 80), args)
  File "/usr/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, in _send_method
    self.channel_id, method_sig, args, content,
  File "/usr/local/lib/python2.7/site-packages/amqp/method_framing.py", line 221, in write_method
    write_frame(1, channel, payload)
  File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 182, in write_frame
    frame_type, channel, size, payload, 0xce,
  File "/usr/local/lib/python2.7/site-packages/gevent/_socket2.py", line 412, in sendall
    timeleft = self.__send_chunk(chunk, flags, timeleft, end)
  File "/usr/local/lib/python2.7/site-packages/gevent/_socket2.py", line 351, in __send_chunk
    data_sent += self.send(chunk, flags)
  File "/usr/local/lib/python2.7/site-packages/gevent/_socket2.py", line 320, in send
    return sock.send(data, flags)
error: [Errno 32] Broken pipe

======= Rabbitmq log ==================

=ERROR REPORT==== 8-Aug-2016::14:28:33 ===
closing AMQP connection <0.15928.4> (10.26.39.183:60732 -> 10.26.39.183:5672):
{writer,send_failed,{error,enotconn}}

=ERROR REPORT==== 8-Aug-2016::14:29:03 ===
closing AMQP connection <0.15981.4> (10.26.39.183:60736 -> 10.26.39.183:5672):
{writer,send_failed,{error,enotconn}}

=ERROR REPORT==== 8-Aug-2016::14:29:03 ===
closing AMQP connection <0.15955.4> (10.26.39.183:60734 -> 10.26.39.183:5672):
{writer,send_failed,{error,enotconn}}

The similar issue appears when Celery worker use eventlet.

[2016-08-09 17:41:37,952: CRITICAL/MainProcess] Couldn't ack 583, reason:error(32, 'Broken pipe')
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 93, in ack_log_error
    self.ack()
  File "/usr/local/lib/python2.7/site-packages/kombu/message.py", line 88, in ack
    self.channel.basic_ack(self.delivery_tag)
  File "/usr/local/lib/python2.7/site-packages/amqp/channel.py", line 1584, in basic_ack
    self._send_method((60, 80), args)
  File "/usr/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, in _send_method
    self.channel_id, method_sig, args, content,
  File "/usr/local/lib/python2.7/site-packages/amqp/method_framing.py", line 221, in write_method
    write_frame(1, channel, payload)
  File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 182, in write_frame
    frame_type, channel, size, payload, 0xce,
  File "/usr/local/lib/python2.7/site-packages/eventlet/greenio/base.py", line 385, in sendall
    tail = self.send(data, flags)
  File "/usr/local/lib/python2.7/site-packages/eventlet/greenio/base.py", line 379, in send
    return self._send_loop(self.fd.send, data, flags)
  File "/usr/local/lib/python2.7/site-packages/eventlet/greenio/base.py", line 366, in _send_loop
    return send_method(data, *args)
error: [Errno 32] Broken pipe

Add setup and load test info

We use supervisor to launch celery with the following options

celery worker -A celerytasks.celery_worker_init -Q default -P gevent -c 1000 --loglevel=info

And Celery use Rabbitmq as broker.

And we have 4 Celery worker processes by specifying "numprocs=4" in supervisor configurations.

We use jmeter to emulate web access load, Django applications will produces tasks for Celery workers to consume. Those tasks basically need to access Mysql DB to get/update some data.

From rabbitmq web admin page, tasks-producing speed is like 50 per seconds while consuming speed is like 20 per seconds. After about 1 miniutes load testing, log file shows many connections between Rabbitmq and Celery met Broken-Pipe error

Upvotes: 6

Views: 5346

Answers (1)

Zeus
Zeus

Reputation: 1275

We noticed that this issue is also caused because of a combination of high prefect count along with high concurrency.

We had concurrency set to 500 and prefetch to 100, which means the ultimate prefetch is 500*100=50,000 per worker. We had around 100k tasks piled up and because of this configuration one worker reserved all tasks for itself and other workers weren't even used, this one worker kept getting Broken pipe error and never acknowledge any task which lead to tasks being never cleared from the queue.

We then changed the prefetch to 3 and restarted all the workers which fixed the issue, after changing the prefetch down to a lower number we have seen 0 instances of Broken pipe error since we used to see it quite often before that.

Upvotes: 2

Related Questions