Reputation: 37
I have a 5GB CSV of IP addresses that I need to parse to a MySQL database.
Currently reading rows from the CSV and inserting into the MySQL. It works great however I would love to make it fast.
Could I parallel the reading and writing somehow? Or perhaps chuck the csv down and spawn from processes to read & write each split csv?
import csv
from csv import reader
from csv import writer
import mysql.connector
cnx = mysql.connector.connect(user='root', password='', host='127.0.0.1', database='ips')
cursor = cnx.cursor()
i = 1
with open('iplist.csv', 'r') as read_obj:
csv_reader = reader(read_obj)
for row in csv_reader:
query = """INSERT INTO ips (ip_start,ip_end,continent) VALUES ('%s','%s','%s')""" % (row[0],row[1],row[2])
print (query)
cursor.execute(query)
cursor.execute('COMMIT')
print(i)
i = i + 1
cnx.close()
Any help is appreciated.
Upvotes: 0
Views: 950
Reputation: 31
My recommendation would be chunk your list. Break it down into 5,000
(or similar) chunks, then iterate through those. This will reduce the amount of queries you are making. Query volume seems to be your biggest bottleneck.
https://medium.com/code-85/two-simple-algorithms-for-chunking-a-list-in-python-dc46bc9cc1a2
Upvotes: 1
Reputation:
I created a pseudo-random CSV file where each row is of the style "111.222.333.444,555.666.777.888,A continent". The file contains 33 million rows. The following code was able to insert all rows into a MySQL database table in ~3 minutes:-
import mysql.connector
import time
import concurrent.futures
import csv
import itertools
CSVFILE='/Users/Andy/iplist.csv'
CHUNK=10_000
def doBulkInsert(rows):
with mysql.connector.connect(user='andy', password='monster', host='localhost', database='andy') as connection:
connection.cursor().executemany(f'INSERT INTO ips (ip_start, ip_end, continent) VALUES (%s, %s, %s)', rows)
connection.commit()
def main():
_s = time.perf_counter()
with open(CSVFILE) as csvfile:
csvdata = csv.reader(csvfile)
_s = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor() as executor:
while (data := list(itertools.islice(csvdata, CHUNK))):
executor.submit(doBulkInsert, data)
executor.shutdown(wait=True)
print(f'Duration = {time.perf_counter()-_s}')
if __name__ == '__main__':
main()
Upvotes: 1
Reputation: 120391
Use cursor.executemany
to increase speed:
# Tested with:
# docker run --rm -e MYSQL_ALLOW_EMPTY_PASSWORD=y -p 3306:3306 mysql
#
# CREATE DATABASE ips;
# USE ips;
# CREATE TABLE ips (id INT PRIMARY KEY NOT NULL AUTO_INCREMENT, ip_start VARCHAR(15), ip_end VARCHAR(15), continent VARCHAR(20));
import mysql.connector
import csv
import itertools
CHUNKSIZE = 1000 # Number of lines
cnx = mysql.connector.connect(user='root', password='', host='127.0.0.1', database='ips')
cursor = cnx.cursor()
with open('iplist.csv', 'r') as csvfile:
reader = csv.reader(csvfile)
while True:
records = list(itertools.islice(reader, CHUNKSIZE))
if not records:
break
query = """INSERT INTO ips (ip_start, ip_end, continent) VALUES (%s, %s, %s)"""
cursor.executemany(query, records)
cursor.execute('COMMIT')
Upvotes: 2