Reputation: 188
I have two files:
app.py
from flask import Flask
from flask_restful import Api
from celery import Celery
from resources.item import Item, ItemList, ItemInsert
from db import db
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = ""
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config['CELERY_BROKER_URL'] = ''
app.config['CELERY_RESULT_BACKEND'] = ''
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
app.secret_key = ""
api = Api(app)
@app.before_first_request
def create_tables():
db.create_all()
api.add_resource(ItemList,"/items")
api.add_resource(Item,"/item/<int:id>")
api.add_resource(ItemInsert,"/items")
@celery.task
def update_row(data):
pass
if __name__ == "__main__":
db.init_app(app)
app.run(debug=True,port=5000)
item.py
from flask_restful import Resource, reqparse
from flask import request
from models.item import ItemModel
class ItemInsert(Resource):
def post(self):
file_task = update_row.apply_async((data,), countdown=3)
return item.json(), 201
As you can see in app.py
I have imported classes from item.py
, however now my celery (task) function call i.e update_row
from item.py
is left hanging, since I cannot import from app.py as it will result in a cyclic import. Is there any solution?
Upvotes: 0
Views: 216
Reputation: 642
celery_app.task
is a decorator, so just a regular Python function. It works in the following way: it takes your function, registers it in the celery app and returns a wrapper object with methods delay
, apply_async
etc. And you can always get a registered task from celery_app.tasks
dictionary by its name. Another trick to avoid circular imports is in storing celery_app
reference as an attribute of flask_app
, and inside request context you can always get current flask app from flask.current_app
app.py
from tasks import register_tasks
...
app = Flask(__name__)
...
app.celery = Celery(app.name, ...)
register_tasks(app.celery)
tasks.py
def update_row(data):
pass
def register_tasks(celery_app):
celery_app.task(update_row, name="update_row")
views.py
from flask import current_app
class ItemInsert(Resource):
def post(self):
update_row = current_app.celery.tasks["update_row"]
file_task = update_row.apply_async((data,), countdown=3)
return item.json(), 201
UPD: indeed the most canonical way is to use autodiscovery of tasks:
myapp/tasks.py
from celery import shared_task
@shared_task
def update_row(data):
pass
myapp/app.py
celery_app = Celery(...)
celery_app.set_default()
celery_app.autodiscover_tasks(["myapp"], force=True)
myapp/views.py
from .tasks import update_row
def index_view():
update_row.delay(...)
Upvotes: 1
Reputation: 3378
With simple project, you could implement the tasks inside app.py
as you're doing for now. But with more complicated project, it's better to move the tasks definition into a separated package so that it could mitigate the cyclic import.
Like so:
App and celery configutation
**# app.py**
# App & celery
# ...
Tasks definitions
**# tasks.py**
from project_name.app import celery
@celery.task
def update_row(data):
pass
API
**# resources/item.py**
from project_name.tasks import update_row
# ...
Separate the tasks into another package (tasks
package, which is auto discovered by Celery) could help you to prevent cyclic import and also good to maintain the code.
But if you're still want to use the current approach, to prevent cyclic import, you could import it dynamically when calling API:
**# resources/item.py**
# ...
class ItemInsert(Resource):
def post(self):
from project_name.app import update_row
file_task = update_row.apply_async((data,), countdown=3)
return item.json(), 201
Upvotes: 1