ItayB
ItayB

Reputation: 11337

socket.error: timed out (Celery & RabbitMQ running in docker containers)

Trying to bring up Celery (with RabbitMQ) official docker containers.

docker run -d --hostname my-rabbit --name some-rabbit rabbitmq
docker run --link some-rabbit:rabbit --name some-celery -d celery

I've checked the log to make sure everything is fine:

# docker logs some-celery

[2016-10-20 11:05:50,357: WARNING/MainProcess] /usr/local/lib/python3.5/site-packages/celery/apps/worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.


  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))
[2016-10-20 11:05:50,419: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbit:5672//: [Errno 111] Connection refused.
Trying again in 2.00 seconds...

[2016-10-20 11:05:52,430: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbit:5672//: [Errno 111] Connection refused.
Trying again in 4.00 seconds...

[2016-10-20 11:05:57,611: WARNING/MainProcess] celery@93b7b6c0b40b ready.

Now, in host I'm using tasks.py:

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://guest:[email protected]/')

@app.task(name='tasks.add')
def add(x, y):
    return x + y

and from host command line I run python and get:

>>> from tasks import add
>>> res=add.delay(4,4)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Library/Python/2.7/site-packages/celery/app/task.py", line 461, in delay
    return self.apply_async(args, kwargs)
  File "/Library/Python/2.7/site-packages/celery/app/task.py", line 573, in apply_async
    **dict(self._get_exec_options(), **options)
  File "/Library/Python/2.7/site-packages/celery/app/base.py", line 354, in send_task
    reply_to=reply_to or self.oid, **options
  File "/Library/Python/2.7/site-packages/celery/app/amqp.py", line 310, in publish_task
    **kwargs
  File "/Library/Python/2.7/site-packages/kombu/messaging.py", line 172, in publish
    routing_key, mandatory, immediate, exchange, declare)
  File "/Library/Python/2.7/site-packages/kombu/connection.py", line 470, in _ensured
    interval_max)
  File "/Library/Python/2.7/site-packages/kombu/connection.py", line 382, in ensure_connection
    interval_start, interval_step, interval_max, callback)
  File "/Library/Python/2.7/site-packages/kombu/utils/__init__.py", line 246, in retry_over_time
    return fun(*args, **kwargs)
  File "/Library/Python/2.7/site-packages/kombu/connection.py", line 250, in connect
    return self.connection
  File "/Library/Python/2.7/site-packages/kombu/connection.py", line 756, in connection
    self._connection = self._establish_connection()
  File "/Library/Python/2.7/site-packages/kombu/connection.py", line 711, in _establish_connection
    conn = self.transport.establish_connection()
  File "/Library/Python/2.7/site-packages/kombu/transport/pyamqp.py", line 116, in establish_connection
    conn = self.Connection(**opts)
  File "/Library/Python/2.7/site-packages/amqp/connection.py", line 165, in __init__
    self.transport = self.Transport(host, connect_timeout, ssl)
  File "/Library/Python/2.7/site-packages/amqp/connection.py", line 186, in Transport
    return create_transport(host, connect_timeout, ssl)
  File "/Library/Python/2.7/site-packages/amqp/transport.py", line 299, in create_transport
    return TCPTransport(host, connect_timeout)
  File "/Library/Python/2.7/site-packages/amqp/transport.py", line 95, in __init__
    raise socket.error(last_err)
socket.error: timed out

I saw this Question but it seems like the error is different there..

EDIT: I've added the RabbitMQ container logs (relevant parts):

    docker logs some-rabbit


              RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: tty
  ######  ##        tty
  ##########
              Starting broker...

=INFO REPORT==== 20-Oct-2016::10:22:41 ===
Starting RabbitMQ 3.6.5 on Erlang 19.0.7
Copyright (C) 2007-2016 Pivotal Software, Inc.
Licensed under the MPL.  See http://www.rabbitmq.com/

=INFO REPORT==== 20-Oct-2016::10:22:41 ===
node           : rabbit@my-rabbit
home dir       : /var/lib/rabbitmq
config file(s) : /etc/rabbitmq/rabbitmq.config
cookie hash    : AlUJAQFic5TGBPlUjyyIOw==
log            : tty
sasl log       : tty
database dir   : /var/lib/rabbitmq/mnesia/rabbit@my-rabbit

=INFO REPORT==== 20-Oct-2016::10:22:42 ===
Memory limit set to 799MB of 1999MB total.

=INFO REPORT==== 20-Oct-2016::10:22:42 ===
Disk free limit set to 50MB

=INFO REPORT==== 20-Oct-2016::10:22:42 ===
Limiting to approx 1048476 file handles (943626 sockets)

=INFO REPORT==== 20-Oct-2016::10:22:42 ===
FHC read buffering:  OFF
FHC write buffering: ON

=INFO REPORT==== 20-Oct-2016::10:22:42 ===
Database directory at /var/lib/rabbitmq/mnesia/rabbit@my-rabbit is empty. Initialising from scratch...

=INFO REPORT==== 20-Oct-2016::10:22:42 ===
    application: mnesia
    exited: stopped
    type: temporary

=INFO REPORT==== 20-Oct-2016::10:22:43 ===
Priority queues enabled, real BQ is rabbit_variable_queue

=INFO REPORT==== 20-Oct-2016::10:22:43 ===
Adding vhost '/'

=INFO REPORT==== 20-Oct-2016::10:22:43 ===
Creating user 'guest'

=INFO REPORT==== 20-Oct-2016::10:22:43 ===
Setting user tags for user 'guest' to [administrator]

=INFO REPORT==== 20-Oct-2016::10:22:43 ===
Setting permissions for 'guest' in '/' to '.*', '.*', '.*'

=INFO REPORT==== 20-Oct-2016::10:22:43 ===
msg_store_transient: using rabbit_msg_store_ets_index to provide index

=INFO REPORT==== 20-Oct-2016::10:22:43 ===
msg_store_persistent: using rabbit_msg_store_ets_index to provide index

=WARNING REPORT==== 20-Oct-2016::10:22:43 ===
msg_store_persistent: rebuilding indices from scratch

=INFO REPORT==== 20-Oct-2016::10:22:43 ===
started TCP Listener on [::]:5672
 completed with 0 plugins.

=INFO REPORT==== 20-Oct-2016::10:22:43 ===
Server startup complete; 0 plugins started.

=INFO REPORT==== 20-Oct-2016::10:22:47 ===
accepting AMQP connection <0.353.0> (172.17.0.3:41166 -> 172.17.0.2:5672)

=INFO REPORT==== 20-Oct-2016::10:22:47 ===
accepting AMQP connection <0.360.0> (172.17.0.3:41168 -> 172.17.0.2:5672)

Upvotes: 1

Views: 2687

Answers (1)

ItayB
ItayB

Reputation: 11337

Fixed. I've followed this question. I changed the tasks.py broker ip address to localhost (because the RabbitMQ port exposed to host and behave like localhost process):

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost')

@app.task(name='tasks.add')
def add(x, y):
    return x + y

Then, I run the containers with:

docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management

docker run -v /Users/user/hostpath:/home/user --link some-rabbit:rabbit --name some-celery -d celery

and add celeryconfig.py file to /Users/user/hostpath:

CELERY_IMPORTS = ('tasks')
CELERY_IGNORE_RESULT = False
CELERY_RESULT_BACKEND = 'amqp'
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

and that's work for me :)

Upvotes: 1

Related Questions