Reputation: 33
I have over a Million times snapshots files that I need to merge and create a single file/db for analysis.
My attempt to do this in the code below. first, I read a small csv from a list of URLs, takes a few columns, parse date field from text to date and writes it to a sqlite database.
while this code works well enough over a small subset of files, is too slow to iterate over a million CSVs.
I'm not sure how to increase performance or even whether Python is the right tool for the job or not. any help in improving this code or suggestions will be much appreciated.
import pandas as pd
from sqlalchemy import create_engine
import datetime
import requests
import csv
import io
csv_database2 = create_engine('sqlite:///csv_database_test.db')
col_num = [0,8,9,12,27,31]
with open('url.csv','r') as line_list:
reader = csv.DictReader(line_list,)
for line in reader:
data = requests.get(line['URL'])
df = pd.read_csv(io.StringIO(data.text), usecols=col_num, infer_datetime_format=True)
df.columns.values[0] = 'DateTime'
df['ParseDateTime'] = [datetime.datetime.strptime(t, "%a %b %d %H:%M:%S %Y") for t in df.DateTime]
df.to_sql('LineList', csv_database2, if_exists='append')
Upvotes: 3
Views: 266
Reputation: 3744
IMHO python is well suited for this task and with simple modifications you can achieve your desired performance.
AFAICS there could be two bottlenecks that affect performance:
you download a single file at a time, if download a file takes 0.2 sec to download 1M files it'll take > 2 days!
I suggest you'll parallelize the download, example code using concurrent.futures
:
from concurrent.futures import ThreadPoolExecutor
import requests
def insert_url(line):
"""download single csv url and insert it to SQLite"""
data = requests.get(line['URL'])
df = pd.read_csv(io.StringIO(data.text), usecols=col_num,
infer_datetime_format=True)
df.columns.values[0] = 'DateTime'
df['ParseDateTime'] = [
datetime.datetime.strptime(t, "%a %b %d %H:%M:%S %Y") for t in
df.DateTime]
df.to_sql('LineList', csv_database2, if_exists='append')
with ThreadPoolExecutor(max_workers=128) as pool:
pool.map(insert_url, lines)
try to take a look at how to optimize the SQL insertions at this SO answer.
Upvotes: 1