Reputation: 3510
I'm getting this error when trying to run a Celery task in my Flask app:
celery.exceptions.TimeoutError: The operation timed out.
Here's my config file:
from celery import Celery
import os
from flask import Flask
def create_celery_app():
celery = Celery(
'celery-app',
broker=os.environ.get('REDIS_URL', 'redis://localhost:6379/0'),
backend=os.environ.get('REDIS_URL', 'redis://localhost:6379/0')
)
celery.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='America/Los_Angeles',
enable_utc=True,
task_track_started=True,
imports=['tasks'],
task_routes={
'tasks.run_action_task': {'queue': 'default'},
'tasks.add': {'queue': 'default'},
}
)
return celery
celery_app = create_celery_app()
def init_celery(app: Flask):
class ContextTask(celery_app.Task):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery_app.Task = ContextTask
__all__ = ['celery_app', 'init_celery']
And the task definition:
from celery_config import celery_app
from flask import current_app
@celery_app.task
def add(x, y):
try:
current_app.logger.info(f"Starting add task with {x} and {y}")
return 'Celery add task executed successfully! ' + str(x + y)
except Exception as e:
current_app.logger.error(f"Error in task add: {str(e)}")
raise
The Celery app is initiated in app.py:
# Initialize Celery with Flask app context
init_celery(app)
The other odd thing is that in the log, the task appears but the code doesn't execute.
2024-11-14 22:30:35,600 - flower.command - INFO - Broker: redis://localhost:6379/0
2024-11-14 22:30:35,602 - flower.command - INFO - Registered tasks:
['celery.accumulate',
'celery.backend_cleanup',
'celery.chain',
'celery.chord',
'celery.chord_unlock',
'celery.chunks',
'celery.group',
'celery.map',
'celery.starmap',
'tasks.add',
'tasks.run_action_task',
'tasks.simple_test_task']
2024-11-14 22:30:35,610 - kombu.mixins - INFO - Connected to redis://localhost:6379/0
2024-11-14 22:30:47,063 - celery.utils.functional - DEBUG -
def add(x, y):
return 1
I'm starting Celery with this command:
celery -A app:celery worker --loglevel=info
I have Redis and Flower running. Flower shows the worker running, but the tasks aren't being assigned to the worker. I'm running everything in Python venv on Windows.
Upvotes: 0
Views: 30
Reputation: 3510
The combination of using the command from u/BcK (naming the queue) and this workaround for Windows solves the issue:
Celery is not sending task messages to Docker Redis, tasks stuck on "received"
The default value for the --pool flag for celery is prefork. Unfortunately it seems that this is not supported for windows. the gevent pool does work.
just install gevent with pip install gevent and then add --pool=gevent to your celery worker command.
Upvotes: 0
Reputation: 2821
You're instructing celery to send the add
task to the default
queue. In that case, when you are running a worker you need to give it the queue name it should subscribe to, otherwise it defaults to "celery".
celery --app flask_app.celery_app worker --loglevel=info -Q default,celery
Also you're having a circular dependency issue. Here's the file structure I used for your case:
# flask_app.py
from flask import Flask
from celery_app import celery_init_app
app = Flask(__name__)
celery_app = celery_init_app(app)
@app.route("/")
def index():
from tasks import add
add.delay(1, 2)
return "Hello, World!"
# tasks.py
from flask import current_app
from flask_app import celery_app
@celery_app.task
def add(x, y):
try:
current_app.logger.info(f"Starting add task with {x} and {y}")
return "Celery add task executed successfully! " + str(x + y)
except Exception as e:
current_app.logger.error(f"Error in task add: {str(e)}")
raise
# celery_app.py
import os
from celery import Celery, Task
from flask import Flask
def create_celery_app(task_cls):
celery = Celery(
"celery-app",
broker=os.environ.get("REDIS_URL", "redis://localhost:6379/0"),
backend=os.environ.get("REDIS_URL", "redis://localhost:6379/0"),
task_cls=task_cls,
)
celery.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="America/Los_Angeles",
enable_utc=True,
task_track_started=True,
imports=["tasks"],
task_routes={
"tasks.run_action_task": {"queue": "default"},
"tasks.add": {"queue": "default"},
},
)
return celery
def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
celery_app = create_celery_app(task_cls=FlaskTask)
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app
Upvotes: 0