Andrea Zanotti
Andrea Zanotti

Reputation: 51

flask-sqlalchemy - how to obtain a request-independent db session

I am looking at the best (and correct way) to obtain a request-independent db session.

The problem is the following: I am building a web application that has to access the database. The endpoint exposed accepts a request, performs the first work, then create a thread (that will perform the hard work), starts it, and replies to the client with a unique id for the "job". Meanwhile the thread goes on with its work (and it has to access the database) and the client can perform polling to check the status. I am not using dedicated framework to perform this background job, but only a simple thread. I can only have one single background thread going on at any time, for this reason I am maintaining the state in a singleton.

The application is created with the application factory design https://flask.palletsprojects.com/en/1.1.x/patterns/appfactories/

I am using Gunicorn as WSGI server and sqlite as database.

The basic structure of the code is the following (I am removing the business logic and imports, but the concept remain):

api_jobs.py


@bp.route('/jobs', methods=['POST'])
def create_job():

    data = request.get_json(force=True) or {}
    name = data['name']

    job_controller = JobController() # This is a singleton
    job_process = job_controller.start_job(name)
    job_process_dict = job_process.to_dict()

    return jsonify(job_process_dict)

controller.py


class Singleton(type):
    _instances = {}
    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
        return cls._instances[cls]

class JobController(object):
    __metaclass__ = Singleton

    def __init__(self):
        self.job_thread = None

    def start_job(self, name):
        if self.job_thread is not None:
            job_id = self.job_thread.job_id
            job_process = JobProcess.query.get(job_id)
            if job_process.status != 'end':
                raise ValueError('A job process is already ongoing!')
            else:
                self.job_thread = None

        job_process = JobProcess(name)
        db.session.add(job_process)
        db.session.commit() # At this step I create the ID

        self.job_thread = JobThread(db.session, job_process.id)
        self.job_thread.start()

        return job_process

class JobThread(threading.Thread):
       def __init__(self, db_session, job_id):
            self.job_id = job_id
            self.db_session = db_session
            self.session = self.db_session()

       def run(self):
             self.job_process = self.session.query(JobProcess).get(self.job_id)
             self.job_process.status = 'working'
             self.session.commit()
             i = 0
             while True:
                   sleep(1)
                   print('working hard')
                   i = i +1
                   if i > 10:
                         break
             self.job_process.status = 'end'
             self.session.commit()
             self.db_session.remove()

models.py

class JobProcess(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    status = db.Column(db.String(64))
    name = db.Column(db.String(64))

    def to_dict(self):
        data = {
            'id': self.id,
            'status': self.status,
            'name': self.name,
        }
        return data

From my understanding, calling self.session = self.db_session() is actually doing nothing (due to the fact that sqlalchemy is using a registry, that is also a proxy, if I am not wrong), however that was the best attempt that I found to create a "new/detached/useful" session.

I checked out https://docs.sqlalchemy.org/en/13/orm/contextual.html#using-thread-local-scope-with-web-applications in order to obtain a request-independent db-session, however even using the suggested method of creating a new session factory (sessionmaker + scoped_session), does not work.

The errors that I obtain, with slight changes to the code, are multiple, in this configuration the error is

DetachedInstanceError: Instance <JobProcess at 0x7f875f81c350> is not bound to a Session; attribute refresh operation cannot proceed (Background on this error at: http://sqlalche.me/e/bhk3)

The basic question remains: Is it possible to create a session that will live inside the thread and that I will take care of creating/tearing down?

Upvotes: 1

Views: 792

Answers (1)

EAW
EAW

Reputation: 952

The reason that you are encountering the DetachedInstanceError is that you are attempting to pass the session from your main thread to your job thread. Sqlalchemy is using thread local storage to manage the sessions and thus a single session cannot be shared between two threads. You just need to create a new session in the run method of your job thread.

Upvotes: 1

Related Questions