Reputation: 9
I'm developing a multi-threaded application with SQLite database. I've done some research and it seems that SqliteQueueDatabase can deliver the required concurrency handling. I've dig through the documentation but it seems I haven't managed to see the whole picture how to start & initiate the database.
from peewee import *
from playhouse.sqliteq import SqliteQueueDatabase
db = SqliteQueueDatabase(':memory:')
class Prime(Model):
num = IntegerField()
class Meta:
database = db
db.start()
db.connect()
db.create_tables([Prime])
print db.get_tables() # prints []
db.stop()
After starting the database in the above example, I try to create the table for my model but it does not get created. What do I miss? I tried to find a peewee & SqliteQueueDatabase example that covers the whole lifecycle but was unable to.
Upvotes: 0
Views: 1494
Reputation: 9
What I ended up doing is that I haven't used peewee, SqliteQueueDatabase or any ORM but sqlite3 and threading.
Using a sort of singleton trick I had basically 1 instance of an object that had a connection property, therefore a single connection instance was shared with all the threads. I had to set check_same_thread=False
when connecting to sqlite otherwise different threads can't share the same connection.
Below is a simplified version:
import sqlite3
import threading
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class Dao(object, metaclass=Singleton):
def __init__(self, conf=None):
self.lock = threading.Lock()
self.conn = sqlite3.connect(
conf.db,
check_same_thread=False
)
The first time I've instantiated the Dao
class in the main thread and passed a configuration to __init__
. Later whichever thread needed to use the database, it just created a Dao
object. However due to the Singleton trick the caller just got a reference to the already existing instance that also included the already established connection.
I added all my DB operations as methods to this Dao
class. In order to avoid repeating the locking, I've created decorators using wraps
.
from functools import wraps
def transaction_read_write(fn):
@wraps(fn)
def wrapper(self, *args, **kwargs):
self.lock.acquire()
# execute wrapped method and perform commit
try:
ret = fn(self, *args, **kwargs)
self.conn.commit()
except Exception as e:
# perform rollback in case of an error
# also in real world application do some logging here
self.conn.rollback()
raise e
finally:
# release acquired lock
self.lock.release()
return ret
return wrapper
I had a similar wrapper for read-only operations but without commit/rollback. I made it configurable in that one whether to perform locking during read-only operations -- just to be able to adjust locking behaviour without a new release should a DB concurrency issue occur in prod.
Now all I had to do is to add my custom transaction decorators to the Dao
methods. (In real life a transaction can be made up from several SQL commands of course. Therefore I had some atomic methods without transaction annotation which were never called directly from outside. They were called only by some other Dao
methods which performed several calls within a single transaction so these complex methods had transaction annotations. I was very careful with how large and how fast these transactions were because in my case the locking mechanism basically prevented the other threads from working with the DB at the same time.)
So a Dao
method could look something like:
@transaction_read_write
def set_processed_files(self, id, num_files):
cur = self.conn.cursor()
cur.execute("UPDATE jobs SET num_files = ? WHERE job_id = ?", (num_files, id))
At the end I decided not to use peewee but I hope there is something useful in my example.
Upvotes: 0
Reputation: 21
I've run into this problem and it seems that the read query completes before create_tables()
does.
My fix is to throw in back-to-back db.stop()
db.start()
calls. Forcing code execution to wait until all database write queries are completed.
from peewee import *
from playhouse.sqliteq import SqliteQueueDatabase
db = SqliteQueueDatabase('db.sqlite3')
class Prime(Model):
num = IntegerField()
class Meta:
database = db
db.start()
db.connect()
db.create_tables([Prime])
db.stop() # Wait for create_tables() to complete
db.start() # Continue execution
print db.get_tables()
db.stop()
Upvotes: 2
Reputation: 26235
You're using an in-memory database. In-memory databases use a connection-per-thread. So unless you specifically use the shared-memory mode (you'll have to consult the sqlite docs) you are out-of-luck.
Use a file-based db and your example will run fine.
Upvotes: 2