Reputation: 137
FastAPI app:
import fastapi as _fastapi
from celery import Celery
from celery.result import AsyncResult
app = _fastapi.FastAPI()
celery_app = Celery(
"worker",
broker_url="amqp://guest:guest@rabbit:5672//",
result_backend="rpc://",
)
celery_app.conf.task_routes = {"celery_worker.test_celery": "test-queue"}
celery_app.conf.update(task_track_started=True)
@app.get("/{word}")
async def root(word: str):
task = celery_app.send_task("celery_worker.test_celery", args=[word])
return {"message": "Word received", "id": f"{task}"}
@app.get("/api/result/{task_id}")
async def result(task_id: str):
task = AsyncResult(task_id)
# Task Not Ready
if not task.ready():
return {"status": task.status}
# Task done: return the value
task_result= task.get()
result = task_result.get("result")
return {"task_id": str(task_id),
"status": task_result.get("status"),
"result": result,
}
Dockerfile:
FROM python:3.10-slim
WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
COPY ./requirements.txt .
RUN pip install --upgrade pip && pip install -r requirements.txt --no-cache-dir
COPY . .
docker-compose.yml:
version: '3.8'
services:
ylab:
container_name: ylab
build:
context: .
command: "uvicorn main:app --reload --host 0.0.0.0"
ports:
- "8000:8000"
networks:
- api_network
rabbit:
container_name: rabbit
image: rabbitmq:3.10.7-management
ports:
- "15672:15672"
- "5672:5672"
networks:
- api_network
celery_worker:
container_name: celery_worker
build:
context: .
command: celery -A main.celery_app worker --loglevel=INFO
networks:
- api_network
networks:
api_network:
name: api_network
The root() function works well. I can send messages, return a task id, and see all the messages in the RabbitMQ queue, but the result() function for any task id returns task.ready() == False
Can anyone tell me what is the error in this code?
Services info:
RabbitMQ 3.10.7
Celery:
celery@415bde516932 v5.2.3 (dawn-chorus)
Linux-5.10.0-18-amd64-x86_64-with-glibc2.31 2023-02-05 12:02:49
app: worker:0x7f3679306c20
transport: amqp://guest:**@rabbit:5672//
results: rpc://
concurrency: 8 (prefork)
task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
Upvotes: 0
Views: 1315
Reputation: 311988
According to the documentation for task_track_started
:
If True the task will report its status as ‘started’ when the task is executed by a worker.
But in your code, you don't seem to have anything consuming the tasks that you're placing on the queue. They will stay in PENDING state forever.
I started by writing your code to use automatic task routing, using <func>.delay
to call a task rather than the lower-level send_task
method:
import time
import fastapi as _fastapi
from celery import Celery
from celery.result import AsyncResult
app = _fastapi.FastAPI()
celery_app = Celery(
"worker",
broker_url="amqp://guest:guest@rabbit:5672//",
result_backend="rpc://",
)
celery_app.conf.update(task_track_started=True)
@celery_app.task
def test_celery(word):
time.sleep(10)
return word.upper()
@app.get("/{word}")
async def root(word: str):
task = test_celery.delay(word)
return {"message": "Word received", "id": f"{task}"}
@app.get("/api/result/{task_id}")
async def result(task_id: str):
task = AsyncResult(task_id)
# Task Not Ready
if not task.ready():
return {"status": task.status}
# Task done: return the value
task_result= task.get()
return {"task_id": str(task_id),
"result": task_result,
}
When running the above code, a connection to /foo
results in:
{"message":"Word received","id":"34bfe48d-6ab3-4dec-ad7d-aa567315a609"}
A subsequent call to /api/result/34bfe48d-6ab3-4dec-ad7d-aa567315a609
yields:
{"status":"STARTED"}
And if we wait for 10 seconds, the same request results in:
{"task_id":"34bfe48d-6ab3-4dec-ad7d-aa567315a609","result":"FOO"}
We've demonstrated that things work correctly when using automatic task routing. So why isn't your original code working? There are three problems:
You don't have anything watch test-queue
.
You're delivering tasks into test-queue
, but your Celery worker is watching the default celery
queue. You need to use the -Q
argument to have it watch test-queue
instead:
celery_worker:
container_name: celery_worker
build:
context: .
command: celery -A main.celery_app worker --loglevel=INFO -Q test-queue
networks:
- api_network
You don't have any tasks defined.
If you add the -Q test-queue
argument from the previous step and restart the environment, attempts to connect to /foo
will result in the following traceback in your Celery worker:
celery_worker | [2023-02-05 14:12:40,864: ERROR/MainProcess] Received unregistered task of type 'celery_worker.test_celery'.
celery_worker | The message has been ignored and discarded.
[...]
celery_worker | Traceback (most recent call last):
celery_worker | File "/usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py", line 591, in on_task_received
celery_worker | strategy = strategies[type_]
celery_worker | KeyError: 'celery_worker.test_celery'
We can fix that by registering the appropriate task with Celery:
@celery_app.task(name="celery_worker.test_celery")
def test_celery(word):
time.sleep(10)
return word.upper()
With the previous two changes, your code will successfully submit the task to Celery and Celery will pass it to the test_celery
function. However, calls to /api/result/<id>
will fail with:
File "/app/./main.py", line 39, in result
result = task_result.get("result")
AttributeError: 'str' object has no attribute 'get'
You need to to modiofy your result
function so that it looks more like:
@app.get("/api/result/{task_id}")
async def result(task_id: str):
task = AsyncResult(task_id)
# Task Not Ready
if not task.ready():
return {"status": task.status}
# Task done: return the value
task_result = task.get()
return {
"task_id": str(task_id),
"result": task_result,
}
With these three changes, your original code works as intended. The complete modified code looks like:
import time
import fastapi
from celery import Celery
from celery.result import AsyncResult
app = fastapi.FastAPI()
celery_app = Celery(
"worker",
broker_url="amqp://guest:guest@rabbit:5672//",
result_backend="rpc://",
)
celery_app.conf.task_routes = {"celery_worker.test_celery": "test-queue"}
celery_app.conf.update(task_track_started=True)
@celery_app.task(name="celery_worker.test_celery")
def test_celery(word):
time.sleep(10)
return word.upper()
@app.get("/{word}")
async def root(word: str):
task = celery_app.send_task("celery_worker.test_celery", args=[word])
return {"message": "Word received", "id": f"{task}"}
@app.get("/api/result/{task_id}")
async def result(task_id: str):
task = AsyncResult(task_id)
# Task Not Ready
if not task.ready():
return {"status": task.status}
# Task done: return the value
task_result = task.get()
return {
"task_id": str(task_id),
"result": task_result,
}
Upvotes: 3