Reputation: 25
I get the same error as below when attempting to run a Celery task with .delay()
.
Error:
127.0.0.1 - - [06/Aug/2024 02:21:01] "GET /test_route HTTP/1.1" 500 -
Traceback (most recent call last):
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/kombu/utils/functional.py", line 32, in __call__
return self.__value__
AttributeError: 'ChannelPromise' object has no attribute '__value__'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/kombu/connection.py", line 472, in _reraise_as_library_errors
yield
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/kombu/connection.py", line 459, in _ensure_connection
return retry_over_time(
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/kombu/utils/functional.py", line 318, in retry_over_time
return fun(*args, **kwargs)
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/kombu/connection.py", line 934, in _connection_factory
self._connection = self._establish_connection()
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/kombu/connection.py", line 860, in _establish_connection
conn = self.transport.establish_connection()
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/kombu/transport/pyamqp.py", line 203, in establish_connection
conn.connect()
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/amqp/connection.py", line 324, in connect
self.transport.connect()
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/amqp/transport.py", line 129, in connect
self._connect(self.host, self.port, self.connect_timeout)
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/amqp/transport.py", line 184, in _connect
self.sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/flask/app.py", line 2551, in __call__
return self.wsgi_app(environ, start_response)
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/flask/app.py", line 2531, in wsgi_app
response = self.handle_exception(e)
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/flask_restful/__init__.py", line 298, in error_router
return original_handler(e)
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/flask/app.py", line 2528, in wsgi_app
response = self.full_dispatch_request()
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/flask/app.py", line 1825, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/flask_restful/__init__.py", line 298, in error_router
return original_handler(e)
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/flask/app.py", line 1823, in full_dispatch_request
rv = self.dispatch_request()
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/flask/app.py", line 1799, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
File "/mnt/d/mad/Library-Management-System-V2/application/user.py", line 256, in test_func
t = say_hello.delay()
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/celery/app/task.py", line 444, in delay
return self.apply_async(args, kwargs)
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/celery/app/task.py", line 594, in apply_async
return app.send_task(
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/celery/app/base.py", line 799, in send_task
amqp.send_task_message(P, name, message, **options)
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/celery/app/amqp.py", line 518, in send_task_message
ret = producer.publish(
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/kombu/messaging.py", line 186, in publish
return _publish(
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/kombu/connection.py", line 556, in _ensured
return fun(*args, **kwargs)
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/kombu/messaging.py", line 195, in _publish
channel = self.channel
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/kombu/messaging.py", line 218, in _get_channel
channel = self._channel = channel()
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/kombu/utils/functional.py", line 34, in __call__
value = self.__value__ = self.__contract__()
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/kombu/messaging.py", line 234, in <lambda>
channel = ChannelPromise(lambda: connection.default_channel)
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/kombu/connection.py", line 953, in default_channel
self._ensure_connection(**conn_opts)
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/kombu/connection.py", line 458, in _ensure_connection
with ctx():
File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
self.gen.throw(typ, value, traceback)
File "/mnt/d/mad/Library-Management-System-V2/venv2/lib/python3.10/site-packages/kombu/connection.py", line 476, in _reraise_as_library_errors
raise ConnectionError(str(exc)) from exc
kombu.exceptions.OperationalError: [Errno 111] Connection refused
My Project Structure:
proj
├── application
│ ├── worker.py
│ ├── backend_jobs.py
│ └── user.py
├── app.py
├── celery_config.py
└── config.py
celery_config.py
broker_url = "redis://localhost:6379/1"
result_backend = "redis://localhost:6379/2"
timezone = "Asia/kolkata"
broker_connection_retry_on_startup=True
config.py
class Config(object):
DEBUG = False
TESTING = False
class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///lms_v2.sqlite3'
SECRET_KEY = "itissecretofcourse"
SECURITY_PASSWORD_SALT = "nowyourpasswordissalted"
SQLALCHEMY_TRACK_MODIFICATIONS = False
WTF_CSRF_ENABLED = False
SECURITY_TOKEN_AUTHENTICATION_HEADER = 'Authentication-Token'
app.py
from flask import Flask
from flask_cors import CORS
from flask_security import Security
from config import DevelopmentConfig
from application.model import *
from application.api import api
from application.worker import celery_init_app
def create_app():
app = Flask(__name__)
app.config.from_object(DevelopmentConfig)
db.init_app(app)
api.init_app(app)
app.security = Security(app, datastore)
with app.app_context():
import application.login
import application.admin
import application.user
# CORS(app)
return app
# create app
app = create_app()
celery_app = celery_init_app(app)
if __name__ == '__main__':
app.run(debug=True)
worker.py
from celery import Celery, Task
def celery_init_app(app):
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object("celery_config")
return celery_app
backend_jobs.py
from celery import shared_task
@shared_task(ignore_result=False)
def say_hello():
return "Hello world"
user.py
from application.backend_jobs import *
@app.get('/test_route')
def test_func():
t = say_hello.delay()
return jsonify({"task_id": t.id})
I want help to fix the error.
Please help to fix this error.
Upvotes: 1
Views: 90