Reputation: 3837
I am following the Celery First Steps tutorial here : http://celery.readthedocs.org/en/latest/getting-started/first-steps-with-celery.html#keeping-results
I am following with the tutorial as is, with RabbitMQ.
When I am doing result.get(timeout=1), it is showing a timeout error, even though it is a simple add operation, and I can see the worker running and producing correct result (of 8) in the other window
(venv) C:\Volt\celerytest>ipython
Python 2.7.6 (default, Nov 10 2013, 19:24:18) [MSC v.1500 32 bit (Intel)]
Type "copyright", "credits" or "license" for more information.
IPython 2.1.0 -- An enhanced Interactive Python.
? -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help -> Python's own help system.
object? -> Details about 'object', use 'object??' for extra details.
In [1]: from tasks import add
In [2]: a = add(1,3)
In [3]: a
Out[3]: 4
In [4]: a = add.delay(1,3)
In [5]: a.ready()
Out[5]: False
In [6]: a = add.delay(4,4)
In [7]: a.get(timeout=0.5)
---------------------------------------------------------------------------
TimeoutError Traceback (most recent call last)
<ipython-input-7-2c407a92720e> in <module>()
----> 1 a.get(timeout=0.5)
C:\Users\Som\Envs\venv\lib\site-packages\celery\result.pyc in get(self, timeout,
propagate, interval, no_ack, follow_parents)
167 interval=interval,
168 on_interval=on_interval,
--> 169 no_ack=no_ack,
170 )
171 finally:
C:\Users\Som\Envs\venv\lib\site-packages\celery\backends\amqp.pyc in wait_for(se
lf, task_id, timeout, cache, propagate, no_ack, on_interval, READY_STATES, PROPA
GATE_STATES, **kwargs)
155 on_interval=on_interval)
156 except socket.timeout:
--> 157 raise TimeoutError('The operation timed out.')
158
159 if meta['status'] in PROPAGATE_STATES and propagate:
TimeoutError: The operation timed out.
In [8]:
tasks.py file
from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://')
@app.task
def add(x, y):
return x + y
worker log
[tasks]
. tasks.add
[2014-07-17 13:00:33,196: INFO/MainProcess] Connected to amqp://guest:**@127.0.0
.1:5672//
[2014-07-17 13:00:33,211: INFO/MainProcess] mingle: searching for neighbors
[2014-07-17 13:00:34,220: INFO/MainProcess] mingle: all alone
[2014-07-17 13:00:34,240: WARNING/MainProcess] celery@SomsPC ready.
[2014-07-17 13:00:34,242: INFO/MainProcess] Received task: tasks.add[85ff75d8-38
b5-442a-a574-c8b976a33739]
[2014-07-17 13:00:34,243: INFO/MainProcess] Task tasks.add[85ff75d8-38b5-442a-a5
74-c8b976a33739] succeeded in 0.000999927520752s: 4
[2014-07-17 13:00:46,582: INFO/MainProcess] Received task: tasks.add[49de7c6b-96
72-485d-926e-a4e564ccc89a]
[2014-07-17 13:00:46,588: INFO/MainProcess] Task tasks.add[49de7c6b-9672-485d-92
6e-a4e564ccc89a] succeeded in 0.00600004196167s: 8
Upvotes: 10
Views: 7809
Reputation: 61
I understand that I am late with the answer, but maybe it will help someone.
You just need to restart the already running worker after you have configured the backend. You can find information about this on the First Steps page, but only at the very end of the article.
Make sure that you don’t have any old workers still running.
It’s easy to start multiple workers by accident, so make sure that the previous worker is properly shut down before you start a new one.
An old worker that isn’t configured with the expected result backend may be running and is hijacking the tasks.
Upvotes: 0
Reputation: 923
Sometimes I also received TimeoutError
with redis, so I implemented helper function:
celery_app.update(
redis_socket_timeout=5,
redis_socket_connect_timeout=5,
)
def run_task(task, *args, **kwargs):
timeout = 2 * 60
future = task.apply_async(args, kwargs)
time_end = time.time() + timeout
while True:
try:
return future.get(timeout=timeout)
except redis.TimeoutError:
if time.time() < time_end:
continue
raise
Upvotes: 0
Reputation: 455
If you look at this thread it appears that setting --pool=solo
also solves the issue. This works for me.
Upvotes: 4
Reputation: 261
I experience exactly the same problem after going through 'Celery First Steps'.
I think the reason for this is suggested backend='amqp'
.
Setup that worked for me is the following:
app = Celery('tasks', broker='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'
According to docs, when AMQP results back-end is used, each result can be retrieved only once (it's actually a single message in the query).
I suppose, your worker process retrieve it in order to print the result to console:
Task tasks.add[49de7c6b-9672-485d-926e-a4e564ccc89a] succeeded in 0.00600004196167s: 8
so you failed to retrieve the same result again.
Upvotes: 10