Reputation: 6796
I have trouble understanding how to properly open and close database sessions efficiently, as I understood by the sqlalchemy documentation, if I use scoped_session to construct my Session object, and then use the returned Session object to create sessions, it's threadsafe, so basically every thread will get it's own session, and there won't be problems with it. Now the below example works, I put it in an infinite loop to see if it properly closes the sessions and if I monitored it correctly (in mysql by executing "SHOW PROCESSLIST;"), the connections just keep growing, it does not close them, even though I used session.close(), and even remove the scoped_session object at the end of each run. What am I doing wrong? My goal in a larger application is to use the minimum number of database connections required, because my current working implementation creates a new session in every method where it is required and closes it at before returning, which seems inefficient.
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from threading import Thread
from Queue import Queue, Empty as QueueEmpty
from models import MyModel
DATABASE_CONNECTION_INFO = 'mysql://username:password@localhost:3306/dbname'
class MTWorker(object):
def __init__(self, worker_count=5):
self.task_queue = Queue()
self.worker_count = worker_count
self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
self.DBSession = scoped_session(
sessionmaker(
autoflush=True,
autocommit=False,
bind=self.db_engine
)
)
def _worker(self):
db_session = self.DBSession()
while True:
try:
task_id = self.task_queue.get(False)
try:
item = db_session.query(MyModel).filter(MyModel.id == task_id).one()
# do something with item
except Exception as exc:
# if an error occurrs we skip it
continue
finally:
db_session.commit()
self.task_queue.task_done()
except QueueEmpty:
db_session.close()
return
def start(self):
try:
db_session = self.DBSession()
all_items = db_session.query(MyModel).all()
for item in all_items:
self.task_queue.put(item.id)
for _i in range(self.worker_count):
t = Thread(target=self._worker)
t.start()
self.task_queue.join()
finally:
db_session.close()
self.DBSession.remove()
if __name__ == '__main__':
while True:
mt_worker = MTWorker(worker_count=50)
mt_worker.start()
Upvotes: 26
Views: 24990
Reputation: 156278
You should only be calling create_engine
and scoped_session
once per
process (per database). Each will get its own pool of connections or sessions
(respectively), so you want to make sure you're only creating one pool. Just make it a module level global. if you need to manage your sessions more preciesly than that, you probably shouldn't be using scoped_session
Another change to make is to use DBSession
directly as though it were a
session. calling session methods on the scoped_session will transparently
create a thread-local session, if needed, and forward the method call to the
session.
Another thing to be aware of is the
pool_size
of the connection pool, which
is 5 by default. For many applications that's fine, but if you are creating
lots of threads, you might need to tune that parameter
DATABASE_CONNECTION_INFO = 'mysql://username:password@localhost:3306/dbname'
db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
DBSession = scoped_session(
sessionmaker(
autoflush=True,
autocommit=False,
bind=db_engine
)
)
class MTWorker(object):
def __init__(self, worker_count=5):
self.task_queue = Queue()
self.worker_count = worker_count
# snip
Upvotes: 49