Reputation: 18299
I'm attempting to create an in-memory sqlite3 cache to store oauth tokens, but am running into issues regarding multithreading. After running several tests, I've noticed the behavior differs substantially from non-in-memory databases and multithreading.
Notably, reader threads immediately fail with "table is locked" if a writer thread has written without committing. This is true with multiple threads even with isolation_level=None
.
It's not simply that readers are blocked until the transaction is complete, but rather they fail immediately, regardless of timeout
or PRAGMA busy_timeout = 10000
.
The only way I can get it working is to set isolation_level=None
and to do PRAGMA read_uncommitted=TRUE
. I would rather not do this, however.
Is it possible to let the reader threads simply wait the lock instead of immediately failing?
import sqlite3 import threading def get_conn(name, is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None): uri = 'file:%s' % name if is_memory: uri = uri + '?mode=memory&cache=shared' conn = sqlite3.connect(uri, uri=True, timeout=timeout, isolation_level=isolation_level) if pragmas is None: pragmas = [] if not isinstance(pragmas, list): pragmas = [pragmas] for pragma in pragmas: conn.execute(pragma) return conn def work1(name, is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None, loops=1): conn = get_conn(name, is_memory=is_memory, timeout=timeout, isolation_level=isolation_level, pragmas=pragmas) for i in range(loops): conn.execute('INSERT INTO foo VALUES (1)') def work2(name, is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None, loops=1): conn = get_conn(name, is_memory=is_memory, timeout=timeout, isolation_level=isolation_level, pragmas=pragmas) for i in range(loops): len(conn.execute('SELECT * FROM foo').fetchall()) def main(name, is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None, loops=1, num_threads=16): conn = get_conn(name, is_memory=is_memory, timeout=timeout, isolation_level=isolation_level, pragmas=pragmas) try: conn.execute('CREATE TABLE foo(a int)') except sqlite3.OperationalError: conn.execute('DROP TABLE foo') conn.execute('CREATE TABLE foo(a int)') threads = [] for i in range(num_threads): threads.append(threading.Thread(target=work1, args=(name, is_memory, timeout, isolation_level, pragmas, loops))) threads.append(threading.Thread(target=work2, args=(name, is_memory, timeout, isolation_level, pragmas, loops))) for thread in threads: thread.start() for thread in threads: thread.join() # In-Memory Tests # All of these fail immediately with table is locked. There is no delay; timeout/busy_timeout has no effect. main('a', is_memory=True, timeout=5, isolation_level='IMMEDIATE', pragmas=None) main('b', is_memory=True, timeout=5, isolation_level='DEFERRED', pragmas=None) main('c', is_memory=True, timeout=5, isolation_level='EXCLUSIVE', pragmas=None) main('d', is_memory=True, timeout=5, isolation_level=None, pragmas=None) main('e', is_memory=True, timeout=5, isolation_level='IMMEDIATE', pragmas=['PRAGMA busy_timeout = 10000']) main('f', is_memory=True, timeout=5, isolation_level='DEFERRED', pragmas=['PRAGMA busy_timeout = 10000']) main('g', is_memory=True, timeout=5, isolation_level='EXCLUSIVE', pragmas=['PRAGMA busy_timeout = 10000']) main('h', is_memory=True, timeout=5, isolation_level=None, pragmas=['PRAGMA busy_timeout = 10000']) main('i', is_memory=True, timeout=5, isolation_level='IMMEDIATE', pragmas=['PRAGMA read_uncommitted=TRUE']) main('j', is_memory=True, timeout=5, isolation_level='DEFERRED', pragmas=['PRAGMA read_uncommitted=TRUE']) main('k', is_memory=True, timeout=5, isolation_level='EXCLUSIVE', pragmas=['PRAGMA read_uncommitted=TRUE']) # This is the only successful operation, when isolation_level = None and PRAGMA read_uncommitted=TRUE main('l', is_memory=True, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE']) # These start to take a really long time main('m', is_memory=True, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'], loops=100) main('n', is_memory=True, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'], loops=100, num_threads=128) # None of the on disk DB's ever fail: main('o', is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=None) main('p', is_memory=False, timeout=5, isolation_level='DEFERRED', pragmas=None) main('q', is_memory=False, timeout=5, isolation_level='EXCLUSIVE', pragmas=None) main('r', is_memory=False, timeout=5, isolation_level=None, pragmas=None) main('s', is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=['PRAGMA busy_timeout = 10000']) main('t', is_memory=False, timeout=5, isolation_level='DEFERRED', pragmas=['PRAGMA busy_timeout = 10000']) main('u', is_memory=False, timeout=5, isolation_level='EXCLUSIVE', pragmas=['PRAGMA busy_timeout = 10000']) main('v', is_memory=False, timeout=5, isolation_level=None, pragmas=['PRAGMA busy_timeout = 10000']) main('w', is_memory=False, timeout=5, isolation_level='IMMEDIATE', pragmas=['PRAGMA read_uncommitted=TRUE']) main('x', is_memory=False, timeout=5, isolation_level='DEFERRED', pragmas=['PRAGMA read_uncommitted=TRUE']) main('y', is_memory=False, timeout=5, isolation_level='EXCLUSIVE', pragmas=['PRAGMA read_uncommitted=TRUE']) main('z', is_memory=False, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE']) # These actually fail with database is locked main('aa', is_memory=False, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'], loops=100) main('ab', is_memory=False, timeout=5, isolation_level=None, pragmas=['PRAGMA read_uncommitted=TRUE'], loops=100, num_threads=128)
Upvotes: 1
Views: 423
Reputation: 8975
I do not believe that the SQLite3 interface is meant to be re-entrant. I think that each thread would have to obtain a mutex, perform the query, and then release the mutex. Attempt to perform only one database operation at a time. (Python's API-layer would not be expected to do this, as there would ordinarily be no need for any such thing.)
Upvotes: 0