Evgeny Romensky
Evgeny Romensky

Reputation: 137

FastAPI, RabbitMQ, Celery: what's wrong with the code?

FastAPI app:

import fastapi as _fastapi
from celery import Celery
from celery.result import AsyncResult


app = _fastapi.FastAPI()

celery_app = Celery(
    "worker",
    broker_url="amqp://guest:guest@rabbit:5672//",
    result_backend="rpc://",
)
celery_app.conf.task_routes = {"celery_worker.test_celery": "test-queue"}
celery_app.conf.update(task_track_started=True)


@app.get("/{word}")
async def root(word: str):
    task = celery_app.send_task("celery_worker.test_celery", args=[word])
    return {"message": "Word received", "id": f"{task}"}


@app.get("/api/result/{task_id}")
async def result(task_id: str):
    task = AsyncResult(task_id)
    # Task Not Ready
    if not task.ready():
        return {"status": task.status}
    # Task done: return the value
    task_result= task.get()
    result = task_result.get("result")
    return {"task_id": str(task_id),
            "status": task_result.get("status"),
            "result": result,
    }

Dockerfile:

FROM python:3.10-slim
WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
COPY ./requirements.txt .
RUN pip install --upgrade pip && pip install -r requirements.txt --no-cache-dir
COPY . .

docker-compose.yml:

version: '3.8'
services:
  ylab:
    container_name: ylab
    build:
      context: .
    command: "uvicorn main:app --reload --host 0.0.0.0"
    ports:
      - "8000:8000"
    networks:
      - api_network
  rabbit:
    container_name: rabbit
    image: rabbitmq:3.10.7-management
    ports:
      - "15672:15672"
      - "5672:5672"
    networks:
      - api_network
  celery_worker:
    container_name: celery_worker
    build:
      context: .
    command: celery -A main.celery_app worker --loglevel=INFO
    networks:
      - api_network

networks:
  api_network:
    name: api_network

The root() function works well. I can send messages, return a task id, and see all the messages in the RabbitMQ queue, but the result() function for any task id returns task.ready() == False

Can anyone tell me what is the error in this code?

Services info:

RabbitMQ 3.10.7

Celery:

celery@415bde516932 v5.2.3 (dawn-chorus)

Linux-5.10.0-18-amd64-x86_64-with-glibc2.31 2023-02-05 12:02:49

app: worker:0x7f3679306c20

transport: amqp://guest:**@rabbit:5672//

results: rpc://

concurrency: 8 (prefork)

task events: OFF (enable -E to monitor tasks in this worker)

[queues]

.> celery exchange=celery(direct) key=celery

Upvotes: 0

Views: 1315

Answers (1)

larsks
larsks

Reputation: 311988

According to the documentation for task_track_started:

If True the task will report its status as ‘started’ when the task is executed by a worker.

But in your code, you don't seem to have anything consuming the tasks that you're placing on the queue. They will stay in PENDING state forever.

I started by writing your code to use automatic task routing, using <func>.delay to call a task rather than the lower-level send_task method:

import time

import fastapi as _fastapi
from celery import Celery
from celery.result import AsyncResult

app = _fastapi.FastAPI()

celery_app = Celery(
    "worker",
    broker_url="amqp://guest:guest@rabbit:5672//",
    result_backend="rpc://",
)
celery_app.conf.update(task_track_started=True)

@celery_app.task
def test_celery(word):
    time.sleep(10)
    return word.upper()


@app.get("/{word}")
async def root(word: str):
    task = test_celery.delay(word)
    return {"message": "Word received", "id": f"{task}"}


@app.get("/api/result/{task_id}")
async def result(task_id: str):
    task = AsyncResult(task_id)

    # Task Not Ready
    if not task.ready():
        return {"status": task.status}

    # Task done: return the value
    task_result= task.get()
    return {"task_id": str(task_id),
            "result": task_result,
    }

When running the above code, a connection to /foo results in:

{"message":"Word received","id":"34bfe48d-6ab3-4dec-ad7d-aa567315a609"}

A subsequent call to /api/result/34bfe48d-6ab3-4dec-ad7d-aa567315a609 yields:

{"status":"STARTED"}

And if we wait for 10 seconds, the same request results in:

{"task_id":"34bfe48d-6ab3-4dec-ad7d-aa567315a609","result":"FOO"}

We've demonstrated that things work correctly when using automatic task routing. So why isn't your original code working? There are three problems:

  1. You don't have anything watch test-queue.

    You're delivering tasks into test-queue, but your Celery worker is watching the default celery queue. You need to use the -Q argument to have it watch test-queue instead:

    celery_worker:
      container_name: celery_worker
      build:
        context: .
      command: celery -A main.celery_app worker --loglevel=INFO -Q test-queue
      networks:
        - api_network
    
  2. You don't have any tasks defined.

    If you add the -Q test-queue argument from the previous step and restart the environment, attempts to connect to /foo will result in the following traceback in your Celery worker:

    celery_worker    | [2023-02-05 14:12:40,864: ERROR/MainProcess] Received unregistered task of type 'celery_worker.test_celery'.
    celery_worker    | The message has been ignored and discarded.
    [...]
    celery_worker    | Traceback (most recent call last):
    celery_worker    |   File "/usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py", line 591, in on_task_received
    celery_worker    |     strategy = strategies[type_]
    celery_worker    | KeyError: 'celery_worker.test_celery'
    

    We can fix that by registering the appropriate task with Celery:

    @celery_app.task(name="celery_worker.test_celery")
    def test_celery(word):
        time.sleep(10)
        return word.upper()
    
  3. With the previous two changes, your code will successfully submit the task to Celery and Celery will pass it to the test_celery function. However, calls to /api/result/<id> will fail with:

      File "/app/./main.py", line 39, in result
        result = task_result.get("result")
    AttributeError: 'str' object has no attribute 'get'
    

    You need to to modiofy your result function so that it looks more like:

    @app.get("/api/result/{task_id}")
    async def result(task_id: str):
        task = AsyncResult(task_id)
    
        # Task Not Ready
        if not task.ready():
            return {"status": task.status}
    
        # Task done: return the value
        task_result = task.get()
        return {
            "task_id": str(task_id),
            "result": task_result,
        }
    

With these three changes, your original code works as intended. The complete modified code looks like:

import time

import fastapi
from celery import Celery
from celery.result import AsyncResult


app = fastapi.FastAPI()

celery_app = Celery(
    "worker",
    broker_url="amqp://guest:guest@rabbit:5672//",
    result_backend="rpc://",
)
celery_app.conf.task_routes = {"celery_worker.test_celery": "test-queue"}
celery_app.conf.update(task_track_started=True)


@celery_app.task(name="celery_worker.test_celery")
def test_celery(word):
    time.sleep(10)
    return word.upper()


@app.get("/{word}")
async def root(word: str):
    task = celery_app.send_task("celery_worker.test_celery", args=[word])
    return {"message": "Word received", "id": f"{task}"}


@app.get("/api/result/{task_id}")
async def result(task_id: str):
    task = AsyncResult(task_id)

    # Task Not Ready
    if not task.ready():
        return {"status": task.status}

    # Task done: return the value
    task_result = task.get()
    return {
        "task_id": str(task_id),
        "result": task_result,
    }

Upvotes: 3

Related Questions