daniels
daniels

Reputation: 19233

Python, SQLite and threading

I'm working on an application that will gather data through HTTP from several places, cache the data locally and then serve it through HTTP.

So I was looking at the following. My application will first create several threads that will gather data at a specified interval and cache that data locally into a SQLite database.

Then in the main thread start a CherryPy application that will query that SQLite database and serve the data.

My problem is: how do I handle connections to the SQLite database from my threads and from the CherryPy application?

If I'd do a connection per thread to the database will I also be able to create/use an in memory database?

Upvotes: 9

Views: 17324

Answers (6)

PirateApp
PirateApp

Reputation: 6260

This test is being done to determine the best way to write and read from SQLite database. We follow 3 approaches below

  1. Read and write without any threads (the methods with the word normal on it)
  2. Read and write with Threads
  3. Read and write with processes

Our sample dataset is a dummy generated OHLC dataset with a symbol, timestamp, and 6 fake values for ohlc and volumefrom, volumeto

Reads

  1. Normal method takes about 0.25 seconds to read
  2. Threaded method takes 10 seconds
  3. Processing takes 0.25 seconds to read

Winner: Processing and Normal

Writes

  1. Normal method takes about 1.5 seconds to write
  2. Threaded method takes about 30 seconds
  3. Processing takes about 30 seconds

Winner: Normal

Note: All records are not written using the threaded and processed write methods. Threaded and processed write methods obviously run into database locked errors as the writes are queued up SQlite only queues up writes to a certain threshold and then throws sqlite3.OperationalError indicating database is locked. The ideal way is to retry inserting the same chunk again but there is no point as the method execution for parallel insertion takes more tine than a sequential read even without retrying the locked/failed inserts Without retrying, 97% of the rows were written and still took 10x more time than a sequential write

Strategies to takeaway:

  1. Prefer reading SQLite and writing it in the same thread

  2. If you must do multithreading, use multiprocessing to read which has more or less the same performance and defer to single threaded write operations

  3. DO NOT USE THREADING for reads and writes as it is 10x slower on both, you can thank the GIL for that

Here is the code for the complete test

import sqlite3
import time
import random
import string
import os
import timeit
from functools import wraps
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import threading
import os

database_file = os.path.realpath('../files/ohlc.db')

create_statement = 'CREATE TABLE IF NOT EXISTS database_threading_test (symbol TEXT, ts INTEGER, o REAL, h REAL, l REAL, c REAL, vf REAL, vt REAL, PRIMARY KEY(symbol, ts))'
insert_statement = 'INSERT INTO database_threading_test VALUES(?,?,?,?,?,?,?,?)'
select = 'SELECT * from database_threading_test'

def time_stuff(some_function):
    def wrapper(*args, **kwargs):
        t0 = timeit.default_timer()
        value = some_function(*args, **kwargs)
        print(timeit.default_timer() - t0, 'seconds')
        return value
    return wrapper

def generate_values(count=100):
    end = int(time.time()) - int(time.time()) % 900
    symbol = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(10))
    ts = list(range(end - count * 900, end, 900))
    for i in range(count):
        yield (symbol, ts[i], random.random() * 1000, random.random() * 1000, random.random() * 1000, random.random() * 1000, random.random() * 1e9, random.random() * 1e5)

def generate_values_list(symbols=1000,count=100):
    values = []
    for _ in range(symbols):
        values.extend(generate_values(count))
    return values

@time_stuff
def sqlite_normal_read():
    """

    100k records in the database, 1000 symbols, 100 rows
    First run
    0.25139795300037804 seconds
    Second run

    Third run
    """
    conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
    try:
        with conn:
            conn.execute(create_statement)
            results = conn.execute(select).fetchall()
            print(len(results))
    except sqlite3.OperationalError as e:
        print(e)

@time_stuff
def sqlite_normal_write():
    """
    1000 symbols, 100 rows
    First run
    2.279409104000024 seconds
    Second run
    2.3364172020001206 seconds
    Third run
    """
    l = generate_values_list()
    conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
    try:
        with conn:
            conn.execute(create_statement)
            conn.executemany(insert_statement, l)

    except sqlite3.OperationalError as e:
        print(e)

@time_stuff
def sequential_batch_read():
    """
    We read all the rows for each symbol one after the other in sequence
    First run
    3.661222331999852 seconds
    Second run
    2.2836898810001003 seconds
    Third run
    0.24514851899994028 seconds
    Fourth run
    0.24082150699996419 seconds
    """
    conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
    try:
        with conn:
            conn.execute(create_statement)
            symbols = conn.execute("SELECT DISTINCT symbol FROM database_threading_test").fetchall()
            for symbol in symbols:
                results = conn.execute("SELECT * FROM database_threading_test WHERE symbol=?", symbol).fetchall()
    except sqlite3.OperationalError as e:
        print(e)  



def sqlite_threaded_read_task(symbol):
    results = []
    conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
    try:
        with conn:
            results = conn.execute("SELECT * FROM database_threading_test WHERE symbol=?", symbol).fetchall()
    except sqlite3.OperationalError as e:
        print(e)
    finally:
        return results

def sqlite_multiprocessed_read_task(symbol):
    results = []
    conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
    try:
        with conn:
            results = conn.execute("SELECT * FROM database_threading_test WHERE symbol=?", symbol).fetchall()
    except sqlite3.OperationalError as e:
        print(e)
    finally:
        return results

@time_stuff
def sqlite_threaded_read():
    """
    1000 symbols, 100 rows per symbol
    First run
    9.429676861000189 seconds
    Second run
    10.18928106400017 seconds
    Third run
    10.382290903000467 seconds
    """
    conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
    symbols = conn.execute("SELECT DISTINCT SYMBOL from database_threading_test").fetchall()
    with ThreadPoolExecutor(max_workers=8) as e:
        results = e.map(sqlite_threaded_read_task, symbols, chunksize=50)
        for result in results:
            pass

@time_stuff
def sqlite_multiprocessed_read():
    """
    1000 symbols, 100 rows
    First run
    0.2484774920012569 seconds!!!
    Second run
    0.24322178500005975 seconds
    Third run
    0.2863524549993599 seconds
    """
    conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
    symbols = conn.execute("SELECT DISTINCT SYMBOL from database_threading_test").fetchall()
    with ProcessPoolExecutor(max_workers=8) as e:
        results = e.map(sqlite_multiprocessed_read_task, symbols, chunksize=50)
        for result in results:
            pass

def sqlite_threaded_write_task(n):
    """
    We ignore the database locked errors here. Ideal case would be to retry but there is no point writing code for that if it takes longer than a sequential write even without database locke errors
    """
    conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
    data = list(generate_values())
    try:
        with conn:
            conn.executemany("INSERT INTO database_threading_test VALUES(?,?,?,?,?,?,?,?)",data)
    except sqlite3.OperationalError as e:
        print("Database locked",e)
    finally:
        conn.close()
        return len(data)

def sqlite_multiprocessed_write_task(n):
    """
    We ignore the database locked errors here. Ideal case would be to retry but there is no point writing code for that if it takes longer than a sequential write even without database locke errors
    """
    conn = sqlite3.connect(os.path.realpath('../files/ohlc.db'))
    data = list(generate_values())
    try:
        with conn:
            conn.executemany("INSERT INTO database_threading_test VALUES(?,?,?,?,?,?,?,?)",data)
    except sqlite3.OperationalError as e:
        print("Database locked",e)
    finally:
        conn.close()
        return len(data)

@time_stuff
def sqlite_threaded_write():
    """

    Did not write all the results but the outcome with 97400 rows written is still this...
    Takes 20x the amount of time as a normal write
    1000 symbols, 100 rows
    First run
    28.17819765000013 seconds
    Second run
    25.557972323000058 seconds
    Third run
    """
    symbols = [i for i in range(1000)]
    with ThreadPoolExecutor(max_workers=8) as e:
        results = e.map(sqlite_threaded_write_task, symbols, chunksize=50)
        for result in results:
            pass

@time_stuff
def sqlite_multiprocessed_write():
    """
    1000 symbols, 100 rows
    First run
    30.09209805699993 seconds
    Second run
    27.502465319000066 seconds
    Third run
    """
    symbols = [i for i in range(1000)]
    with ProcessPoolExecutor(max_workers=8) as e:
        results = e.map(sqlite_multiprocessed_write_task, symbols, chunksize=50)
        for result in results:
            pass


sqlite_normal_write()

Upvotes: 5

dugres
dugres

Reputation: 13103

You can use something like that.

Upvotes: 1

Martin Beckett
Martin Beckett

Reputation: 96167

Depending on the data rate sqlite could be exactly the correct way to do this. The entire database is locked for each write so you aren't going to scale to 1000s of simultaneous writes per second. But if you only have a few it is the safest way of assuring you don't overwrite each other.

Upvotes: 0

paprika
paprika

Reputation: 2484

Depending on the application the DB could be a real overhead. If we are talking about volatile data, maybe you could skip the communication via DB completely and share the data between the data gathering process and the data serving process(es) via IPC. This is not an option if the data has to be persisted, of course.

Upvotes: 0

S.Lott
S.Lott

Reputation: 392010

"...create several threads that will gather data at a specified interval and cache that data locally into a sqlite database. Then in the main thread start a CherryPy app that will query that sqlite db and serve the data."

Don't waste a lot of time on threads. The things you're describing are simply OS processes. Just start ordinary processes to do gathering and run Cherry Py.

You have no real use for concurrent threads in a single process for this. Gathering data at a specified interval -- when done with simple OS processes -- can be scheduled by the OS very simply. Cron, for example, does a great job of this.

A CherryPy App, also, is an OS process, not a single thread of some larger process.

Just use processes -- threads won't help you.

Upvotes: 1

Ali Afshar
Ali Afshar

Reputation: 41667

Short answer: Don't use Sqlite3 in a threaded application.

Sqlite3 databases scale well for size, but rather terribly for concurrency. You will be plagued with "Database is locked" errors.

If you do, you will need a connection per thread, and you have to ensure that these connections clean up after themselves. This is traditionally handled using thread-local sessions, and is performed rather well (for example) using SQLAlchemy's ScopedSession. I would use this if I were you, even if you aren't using the SQLAlchemy ORM features.

Upvotes: 8

Related Questions