fellowCoder
fellowCoder

Reputation: 69

multithreading to load data into sqlite db

I'm downloading a data from an API and storing it in SQLite db. I want to implement the process using "multithreading". Can someone please help me with how to implement it.

I found a library but getting an error. below is the code.

import sqlite3
import os

import pandas as pd
from sodapy import Socrata

import concurrent.futures

dbPath = 'folder where db exists'
dbName = 'db file name'

## Setup connection & cursor with the DB
dbConn = sqlite3.connect(os.path.join(dbPath, dbName), check_same_thread=False)

## Setup the API and bring in the data
client = Socrata("health.data.ny.gov", None)

## Define all the countys to be used in threading
countys = [all 62 countys in New York]

varDict = dict.fromkeys(countys, {})
strDataList = ['test_date', 'LoadDate']
intDataList = ['new_positives', 'cumulative_number_of_positives', 'total_number_of_tests', 'cumulative_number_of_tests']


def getData(county):
    
    ## Check if table exists
    print("Processing ", county)
    varDict[county]['dbCurs'] = dbConn.cursor()
    varDict[county]['select'] = varDict[county]['dbCurs'].execute('SELECT name FROM sqlite_master WHERE type="table" AND name=?', (county,) )
    if not len(varDict[county]['select'].fetchall()):
        createTable(county)
    
    whereClause = 'county="'+county+'"'
    varDict[county]['results'] = client.get("xdss-u53e", where=whereClause)
    varDict[county]['data'] = pd.DataFrame.from_records(varDict[county]['results'])
    varDict[county]['data'].drop(['county'], axis=1, inplace=True)
    varDict[county]['data']['LoadDate'] = pd.to_datetime('now')
    varDict[county]['data'][strDataList] = varDict[county]['data'][strDataList].astype(str)
    varDict[county]['data']['test_date'] = varDict[county]['data']['test_date'].apply(lambda x: x[:10])
    varDict[county]['data'][intDataList] = varDict[county]['data'][intDataList].astype(int)
    varDict[county]['data'] = varDict[county]['data'].values.tolist()

    ## Insert values into SQLite
    varDict[county]['sqlQuery'] = 'INSERT INTO ['+county+'] VALUES (?,?,?,?,?,?)'
    varDict[county]['dbCurs'].executemany(varDict[county]['sqlQuery'], varDict[county]['data'])
    dbConn.commit()
    
# for i in dbCurs.execute('SELECT * FROM albany'):
#     print(i)

def createTable(county):
    
    sqlQuery = 'CREATE TABLE ['+county+'] ( [Test Date] TEXT, [New Positives] INTEGER NOT NULL, [Cumulative Number of Positives] INTEGER NOT NULL, [Total Number of Tests Performed] INTEGER NOT NULL, [Cumulative Number of Tests Performed] INTEGER NOT NULL, [Load date] TEXT NOT NULL, PRIMARY KEY([Test Date]))'
    varDict[county]['dbCurs'].execute(sqlQuery)
    

# for _ in countys:
#     getData(_)
    
# x = countys[:5]

with concurrent.futures.ThreadPoolExecutor() as executor:
    # results = [executor.submit(getData, y) for y in x]
    executor.map(getData, countys)

getData is the function which brings in the data county wise and loads into the db. Countys is a list of all the countys. I am able to do it synchronously but would like to implement multithreading. The for loop to do it synchronously (which works) is

for _ in countys:
    getData(_)

The error message is

ProgrammingError: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 8016 and this is thread id 19844.

Upvotes: 0

Views: 795

Answers (1)

Superzarzar
Superzarzar

Reputation: 539

You might find this useful

sqlite.connect(":memory:", check_same_thread=False)

Upvotes: 1

Related Questions