Reputation: 31
I am integrating celery with FastAPI. I am using rabbitmq as a broker with celery. When ever I submit task to celery I get this error: "kombu.exceptions.OperationalError: [Errno 111] Connection refused". I don't understand maybe its due to the connection with rabbitmq but when I start the celery worker, It didn't give any connection error, but at the time of task submission. Following is my code: main.py
from fastapi import FastAPI
from scraper import crawl_data
from task import sample_task
app = FastAPI()
@app.get("/test")
def test():
data = sample_task.delay()
return {'MESSAGE': 'DONE'}
task.py
from celery_config import app
import time
@app.task
def sample_task():
for i in range(1, 10):
time.sleep(10)
print("DONE TASK")
celery_config.py
from celery import Celery
app = Celery('celery_tutorial',
broker="amqp://guest:guest@localhost:5672//",
include=['task'])
docker-compose.yml
version: "3.9"
services:
main_app:
build:
context: .
dockerfile: fastapi.Dockerfile
command: uvicorn main:app --host 0.0.0.0 --reload
ports:
- "8000:8000"
rabbitmq:
image: rabbitmq:3.8-management-alpine
ports:
- 15673:15672
# celery_worker:
# build:
# context: .
# dockerfile: fastapi.Dockerfile
# command: celery -A celery worker --loglevel=info
# depends_on:
# - rabbitmq
# - main_app
stdin_open: true
I start the FastAPI server and rabbitmq with docker compose, and celery worker with following command:
celery -A celery_config worker --loglevel=info
Upvotes: 1
Views: 1961
Reputation: 11357
Assuming your celery_config.py
run within the main_app
container, the broker's host should be rabbitmq
(service name) rather than localhost:
app = Celery('celery_tutorial',
broker="amqp://guest:guest@rabbitmq:5672/vhost",
include=['task'])
EDIT:
seems like you didn't set the relevant env vars:
rabbitmq:
image: rabbitmq:3.8-management-alpine
ports:
- 15673:15672
environment:
- RABBITMQ_DEFAULT_VHOST=vhost
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
make sure you add them, see my answer here.
Upvotes: 1