dowjones123
dowjones123

Reputation: 3837

Celery First Steps - timeout error on result.get()

I am following the Celery First Steps tutorial here : http://celery.readthedocs.org/en/latest/getting-started/first-steps-with-celery.html#keeping-results

I am following with the tutorial as is, with RabbitMQ.

When I am doing result.get(timeout=1), it is showing a timeout error, even though it is a simple add operation, and I can see the worker running and producing correct result (of 8) in the other window

(venv) C:\Volt\celerytest>ipython
Python 2.7.6 (default, Nov 10 2013, 19:24:18) [MSC v.1500 32 bit (Intel)]
Type "copyright", "credits" or "license" for more information.

IPython 2.1.0 -- An enhanced Interactive Python.
?         -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help      -> Python's own help system.
object?   -> Details about 'object', use 'object??' for extra details.

In [1]: from tasks import add

In [2]: a = add(1,3)

In [3]: a
Out[3]: 4

In [4]: a = add.delay(1,3)

In [5]: a.ready()
Out[5]: False

In [6]: a = add.delay(4,4)

In [7]: a.get(timeout=0.5)
---------------------------------------------------------------------------
TimeoutError                              Traceback (most recent call last)
<ipython-input-7-2c407a92720e> in <module>()
----> 1 a.get(timeout=0.5)

C:\Users\Som\Envs\venv\lib\site-packages\celery\result.pyc in get(self, timeout,
 propagate, interval, no_ack, follow_parents)
    167                 interval=interval,
    168                 on_interval=on_interval,
--> 169                 no_ack=no_ack,
    170             )
    171         finally:

C:\Users\Som\Envs\venv\lib\site-packages\celery\backends\amqp.pyc in wait_for(se
lf, task_id, timeout, cache, propagate, no_ack, on_interval, READY_STATES, PROPA
GATE_STATES, **kwargs)
    155                                     on_interval=on_interval)
    156             except socket.timeout:
--> 157                 raise TimeoutError('The operation timed out.')
    158
    159         if meta['status'] in PROPAGATE_STATES and propagate:

TimeoutError: The operation timed out.

In [8]:

tasks.py file

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://')


@app.task
def add(x, y):
    return x + y

worker log

[tasks]
  . tasks.add

[2014-07-17 13:00:33,196: INFO/MainProcess] Connected to amqp://guest:**@127.0.0
.1:5672//
[2014-07-17 13:00:33,211: INFO/MainProcess] mingle: searching for neighbors
[2014-07-17 13:00:34,220: INFO/MainProcess] mingle: all alone
[2014-07-17 13:00:34,240: WARNING/MainProcess] celery@SomsPC ready.
[2014-07-17 13:00:34,242: INFO/MainProcess] Received task: tasks.add[85ff75d8-38
b5-442a-a574-c8b976a33739]
[2014-07-17 13:00:34,243: INFO/MainProcess] Task tasks.add[85ff75d8-38b5-442a-a5
74-c8b976a33739] succeeded in 0.000999927520752s: 4
[2014-07-17 13:00:46,582: INFO/MainProcess] Received task: tasks.add[49de7c6b-96
72-485d-926e-a4e564ccc89a]
[2014-07-17 13:00:46,588: INFO/MainProcess] Task tasks.add[49de7c6b-9672-485d-92
6e-a4e564ccc89a] succeeded in 0.00600004196167s: 8

Upvotes: 10

Views: 7809

Answers (4)

Eugene
Eugene

Reputation: 61

I understand that I am late with the answer, but maybe it will help someone.

You just need to restart the already running worker after you have configured the backend. You can find information about this on the First Steps page, but only at the very end of the article.

Make sure that you don’t have any old workers still running.

It’s easy to start multiple workers by accident, so make sure that the previous worker is properly shut down before you start a new one.

An old worker that isn’t configured with the expected result backend may be running and is hijacking the tasks.

Upvotes: 0

sorrat
sorrat

Reputation: 923

Sometimes I also received TimeoutError with redis, so I implemented helper function:

celery_app.update(
    redis_socket_timeout=5,
    redis_socket_connect_timeout=5,
)


def run_task(task, *args, **kwargs):
    timeout = 2 * 60
    future = task.apply_async(args, kwargs)
    time_end = time.time() + timeout

    while True:
        try:
            return future.get(timeout=timeout)
        except redis.TimeoutError:
            if time.time() < time_end:
                continue
            raise

Upvotes: 0

EML
EML

Reputation: 455

If you look at this thread it appears that setting --pool=solo also solves the issue. This works for me.

Upvotes: 4

Sergey Petrunin
Sergey Petrunin

Reputation: 261

I experience exactly the same problem after going through 'Celery First Steps'.

I think the reason for this is suggested backend='amqp'.

Setup that worked for me is the following:

app = Celery('tasks', broker='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'

According to docs, when AMQP results back-end is used, each result can be retrieved only once (it's actually a single message in the query).

I suppose, your worker process retrieve it in order to print the result to console:

Task tasks.add[49de7c6b-9672-485d-926e-a4e564ccc89a] succeeded in 0.00600004196167s: 8

so you failed to retrieve the same result again.

Upvotes: 10

Related Questions