Ando
Ando

Reputation: 325

How to connect to flask-sqlalchemy database from inside a RQ job

Using flask-sqlalchemy, how is it possible to connect to a database from within a redis task? The database connection is created in create_app with: db = SQLAlchemy(app)

I call a job from a route:

@app.route("/record_occurrences")
def query_library():
    job = queue.enqueue(ApiQueryService(word), word)

Then inside the redis task, I want to make an update to the database

class ApiQueryService(object):
    def __init__(self,word):
        resp = call_api()
        db.session.query(Model).filter_by(id=word.id).update({"count":resp[1]})

I can't find a way to access the db. I've tried importing it with from app import db. I tried storing it in g. I tried reinstantiating it with SQLAlchemy(app), and several other things, but none of these work. When I was using sqlite, all of this worked, and I could easily connect to the db from any module with a get_db method that simply called sqlite3.connect(). Is there some simple way to access it with SQLAlchemy that's similar to that?

Upvotes: 2

Views: 1784

Answers (1)

miquelvir
miquelvir

Reputation: 1757

This can be solved using the App Factory pattern, as mentioned by @vulpxn.

Let's assume we have our configuration class somewhere like this:

class Config(object):
    DEBUG = False
    TESTING = False
    DEVELOPMENT = False

    API_PAGINATION = 10

    PROPAGATE_EXCEPTIONS = True  # needed due to Flask-Restful not passing them up

    SQLALCHEMY_TRACK_MODIFICATIONS = False  # ref: https://stackoverflow.com/questions/33738467/how-do-i-know-if-i-can-disable-sqlalchemy-track-modifications/33790196#33790196

class ProductionConfig(Config):
    CSRF_COOKIE_SAMESITE = 'Strict'
    SESSION_PROTECTION = "strong"
    SESSION_COOKIE_SECURE = True
    SESSION_COOKIE_HTTPONLY = True
    SESSION_COOKIE_SAMESITE = 'Strict'

    SECRET_KEY = "super-secret"
    INVITES_SECRET = "super-secret"
    PASSWORD_RESET_SECRET = "super-secret"
    PUBLIC_VALIDATION_SECRET = "super-secret"

    FRONTEND_SERVER_URL = "https://127.0.0.1:4999"

    SQLALCHEMY_DATABASE_URI = "sqlite:///%s" % os.path.join(os.path.abspath(os.path.dirname(__file__)), "..",
                                                            "people.db")

We create our app factory:

from flask_sqlalchemy import SQLAlchemy
from flask import Flask
from development.config import DevelopmentConfig
from rq import Queue
from email_queue.worker import conn

db = SQLAlchemy()

q = Queue(connection=conn)

def init_app(config=ProductionConfig):
    # app creation
    app = Flask(__name__)

    app.config.from_object(config)

    # plugin initialization
    db.init_app(app)
   
    with app.app_context():
        # adding blueprints
        from .blueprints import api
        app.register_blueprint(api, url_prefix='/api/v1')

        return app

We will now be able to start our app using the app factory:

app = centrifuga4.init_app()
if __name__ == "__main__":
    with app.app_context():
        app.run()

But we will also be able to (in our Redis job), do the following:

def my_job():
    app = init_app()  
    with app.app_context():
        return something_using_sqlalchemy()

Upvotes: 1

Related Questions