Reputation: 815
I'm struggling with getting results from the Celery task. My app entry point looks like this:
from app import create_app,celery
celery.conf.task_default_queue = 'order_master'
order_app = create_app('../config.order_master.py')
Now, before I start the application I start the RabbitMQ and ensure it has no queues:
root@3d2e6b124780:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
root@3d2e6b124780:/#
Now I start the application. After the start I still see no queues in the RabbitMQ. When I start the task from the application jobs.add_together.delay(2, 3)
I get the task ID:
ralfeus@web-2 /v/w/order (multiple-instances)> (order) curl localhost/test
{"result":"a2c07de4-f9f2-4b21-ae47-c6d92f2a7dfe"}
ralfeus@web-2 /v/w/order (multiple-instances)> (order)
At that moment I can see that my queue has one message:
root@3d2e6b124780:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
dd65ba89-cce9-3e0b-8252-c2216912a910 0
order_master 1
root@3d2e6b124780:/#
Now I start Celery worker:
ralfeus@web-2 /v/w/order (multiple-instances)>
/usr/virtualfish/order/bin/celery -A main_order_master:celery worker --loglevel=INFO -n order_master -Q order_master --concurrency 2
INFO:app:Blueprints are registered
-------------- celery@order_master v5.0.0 (singularity)
--- ***** -----
-- ******* ---- Linux-5.4.0-51-generic-x86_64-with-glibc2.29 2020-10-22 16:38:56
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: app:0x7f374715c5b0
- ** ---------- .> transport: amqp://guest:**@172.17.0.1:5672//
- ** ---------- .> results: rpc://
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> order_master exchange=order_master(direct) key=order_master
[tasks]
. app.jobs.add_together
. app.jobs.post_purchase_orders
[2020-10-22 16:38:57,263: INFO/MainProcess] Connected to amqp://guest:**@172.17.0.1:5672//
[2020-10-22 16:38:57,304: INFO/MainProcess] mingle: searching for neighbors
[2020-10-22 16:38:58,354: INFO/MainProcess] mingle: all alone
[2020-10-22 16:38:58,375: INFO/MainProcess] celery@order_master ready.
[2020-10-22 16:38:58,377: INFO/MainProcess] Received task: app.jobs.add_together[f855bec7-307d-4570-ab04-3d036005a87b]
[2020-10-22 16:40:38,616: INFO/ForkPoolWorker-2] Task app.jobs.add_together[f855bec7-307d-4570-ab04-3d036005a87b] succeeded in 100.13561034202576s: 5
So it's visible the worker could pick up the task and execute it and produce a result. However I can't get the result. Instead, when I request the result I get following:
curl localhost/test/f855bec7-307d-4570-ab04-3d036005a87b
{"state":"PENDING"}
ralfeus@web-2 /v/w/order (multiple-instance)> (order)
If I check the queues now I see that:
root@3d2e6b124780:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
dd65ba89-cce9-3e0b-8252-c2216912a910 1
65d80661-6195-3986-9fa2-e468eaab656e 0
celeryev.9ca5a092-9a0c-4bd5-935b-f5690cf9665b 0
order_master 0
celery@order_master.celery.pidbox 0
root@3d2e6b124780:/#
I see the queue dd65ba89-cce9-3e0b-8252-c2216912a910 has one message, which as I check contains result. So why has it appeared there and how do I get that? All manuals say I just need to get task by ID. But in my case the task is still in pending state.
Upvotes: 7
Views: 3486
Reputation: 815
According to Celery documentation:
RPC Result Backend (RabbitMQ/QPid)
The RPC result backend (rpc://) is special as it doesn’t actually store the states, but rather sends them as messages. This is an important difference as it means that a result can only be retrieved once, and only by the client that initiated the task. Two different processes can’t wait for the same result.
So using rpc://
isn't suitable for retrieving results later by another request.
Upvotes: 15