Reputation: 69
I'm downloading a data from an API and storing it in SQLite db. I want to implement the process using "multithreading". Can someone please help me with how to implement it.
I found a library but getting an error. below is the code.
import sqlite3
import os
import pandas as pd
from sodapy import Socrata
import concurrent.futures
dbPath = 'folder where db exists'
dbName = 'db file name'
## Setup connection & cursor with the DB
dbConn = sqlite3.connect(os.path.join(dbPath, dbName), check_same_thread=False)
## Setup the API and bring in the data
client = Socrata("health.data.ny.gov", None)
## Define all the countys to be used in threading
countys = [all 62 countys in New York]
varDict = dict.fromkeys(countys, {})
strDataList = ['test_date', 'LoadDate']
intDataList = ['new_positives', 'cumulative_number_of_positives', 'total_number_of_tests', 'cumulative_number_of_tests']
def getData(county):
## Check if table exists
print("Processing ", county)
varDict[county]['dbCurs'] = dbConn.cursor()
varDict[county]['select'] = varDict[county]['dbCurs'].execute('SELECT name FROM sqlite_master WHERE type="table" AND name=?', (county,) )
if not len(varDict[county]['select'].fetchall()):
createTable(county)
whereClause = 'county="'+county+'"'
varDict[county]['results'] = client.get("xdss-u53e", where=whereClause)
varDict[county]['data'] = pd.DataFrame.from_records(varDict[county]['results'])
varDict[county]['data'].drop(['county'], axis=1, inplace=True)
varDict[county]['data']['LoadDate'] = pd.to_datetime('now')
varDict[county]['data'][strDataList] = varDict[county]['data'][strDataList].astype(str)
varDict[county]['data']['test_date'] = varDict[county]['data']['test_date'].apply(lambda x: x[:10])
varDict[county]['data'][intDataList] = varDict[county]['data'][intDataList].astype(int)
varDict[county]['data'] = varDict[county]['data'].values.tolist()
## Insert values into SQLite
varDict[county]['sqlQuery'] = 'INSERT INTO ['+county+'] VALUES (?,?,?,?,?,?)'
varDict[county]['dbCurs'].executemany(varDict[county]['sqlQuery'], varDict[county]['data'])
dbConn.commit()
# for i in dbCurs.execute('SELECT * FROM albany'):
# print(i)
def createTable(county):
sqlQuery = 'CREATE TABLE ['+county+'] ( [Test Date] TEXT, [New Positives] INTEGER NOT NULL, [Cumulative Number of Positives] INTEGER NOT NULL, [Total Number of Tests Performed] INTEGER NOT NULL, [Cumulative Number of Tests Performed] INTEGER NOT NULL, [Load date] TEXT NOT NULL, PRIMARY KEY([Test Date]))'
varDict[county]['dbCurs'].execute(sqlQuery)
# for _ in countys:
# getData(_)
# x = countys[:5]
with concurrent.futures.ThreadPoolExecutor() as executor:
# results = [executor.submit(getData, y) for y in x]
executor.map(getData, countys)
getData is the function which brings in the data county wise and loads into the db. Countys is a list of all the countys. I am able to do it synchronously but would like to implement multithreading. The for loop to do it synchronously (which works) is
for _ in countys:
getData(_)
The error message is
ProgrammingError: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 8016 and this is thread id 19844.
Upvotes: 0
Views: 795
Reputation: 539
You might find this useful
sqlite.connect(":memory:", check_same_thread=False)
Upvotes: 1