radix
radix

Reputation: 188

SqlAlchemy and Celery structure Issue in python

I have two files:

app.py

from flask import Flask
from flask_restful import Api
from celery import Celery
from resources.item import Item, ItemList, ItemInsert
from db import db

app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = ""
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False

app.config['CELERY_BROKER_URL'] = ''
app.config['CELERY_RESULT_BACKEND'] = ''

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])

app.secret_key = ""
api = Api(app)

@app.before_first_request
def create_tables():
    db.create_all()

api.add_resource(ItemList,"/items")
api.add_resource(Item,"/item/<int:id>")
api.add_resource(ItemInsert,"/items")

@celery.task
def update_row(data):
    pass

if __name__ == "__main__":
    db.init_app(app)
    app.run(debug=True,port=5000)

item.py

from flask_restful import Resource, reqparse
from flask import request
from models.item import ItemModel

class ItemInsert(Resource):
    def post(self):
        file_task = update_row.apply_async((data,), countdown=3)
        return item.json(), 201

As you can see in app.py I have imported classes from item.py, however now my celery (task) function call i.e update_row from item.py is left hanging, since I cannot import from app.py as it will result in a cyclic import. Is there any solution?

Upvotes: 0

Views: 216

Answers (2)

inf581
inf581

Reputation: 642

celery_app.task is a decorator, so just a regular Python function. It works in the following way: it takes your function, registers it in the celery app and returns a wrapper object with methods delay, apply_async etc. And you can always get a registered task from celery_app.tasks dictionary by its name. Another trick to avoid circular imports is in storing celery_app reference as an attribute of flask_app, and inside request context you can always get current flask app from flask.current_app

app.py

from tasks import register_tasks
...
app = Flask(__name__)
...
app.celery = Celery(app.name, ...)
register_tasks(app.celery)

tasks.py

def update_row(data):
    pass

def register_tasks(celery_app):
    celery_app.task(update_row, name="update_row")

views.py

from flask import current_app

class ItemInsert(Resource):
    def post(self):
        update_row = current_app.celery.tasks["update_row"]
        file_task = update_row.apply_async((data,), countdown=3)
        return item.json(), 201

UPD: indeed the most canonical way is to use autodiscovery of tasks:

myapp/tasks.py

from celery import shared_task

@shared_task
def update_row(data):
    pass

myapp/app.py

celery_app = Celery(...)
celery_app.set_default()
celery_app.autodiscover_tasks(["myapp"], force=True)

myapp/views.py

from .tasks import update_row

def index_view():
    update_row.delay(...)

Upvotes: 1

Toan Quoc Ho
Toan Quoc Ho

Reputation: 3378

With simple project, you could implement the tasks inside app.py as you're doing for now. But with more complicated project, it's better to move the tasks definition into a separated package so that it could mitigate the cyclic import.

Like so:

App and celery configutation

**# app.py**
# App & celery
# ...

Tasks definitions

**# tasks.py**
from project_name.app import celery

@celery.task
def update_row(data):
    pass

API

**# resources/item.py**
from project_name.tasks import update_row
# ...

Separate the tasks into another package (tasks package, which is auto discovered by Celery) could help you to prevent cyclic import and also good to maintain the code.


But if you're still want to use the current approach, to prevent cyclic import, you could import it dynamically when calling API:

**# resources/item.py**
# ...
class ItemInsert(Resource):
    def post(self):
        from project_name.app import update_row
        file_task = update_row.apply_async((data,), countdown=3)
        return item.json(), 201

Upvotes: 1

Related Questions