Reputation: 115
I have a 10 million row table in MySQL DB which I need to read, do some validation checks on my client machine and load into a table in postgres database. I am able to successfully get the data into my machine but I am having out of memory issues while trying to process the data and load into the postgres database
Here is the code I currently have:
from sqlalchemy import create_engine, MetaData, Table
# MySQL database connection
source_engine = create_engine('mysql+pymysql://user:pwd@serveraddress:3306/dbname')
source_connection = engine.connect()
# Read the entire data
data = source_connection.execute('SELECT * FROM table')
# close the MySQL connection
source_connection.close()
# function to transform data
def transform(data):
def process_row(row):
"""do data validation on the row"""
return row
# process and return the incoming dataset as a list of dicts
processed_data = [dict(zip(data.keys(), process_row(d)) for d in data]
return processed_data
transformed_data = transform(data)
# Postgres database connection
dest_connection = create_engine('postgresql://user:pwd@serveraddress:5432/dbname')
dest_meta = MetaData(bind=dest_connection, reflect=True, schema='test')
table = Table('table_name', self.meta, autoload=True)
dest_connection.execute(table.insert().values(transformed_data))
dest_connection.close()
Can anyone suggest a simple way to do this?
Upvotes: 4
Views: 3583
Reputation: 1158
You are on the right path! I had the same problems with the code I was working a couple of weeks ago.
One way to accomplish what you want and avoid memory issues is to do the reading part inside a function that loops over your query and ends with yield
. This is good to save memory and do you operation in chunks. The downside is that it will take more time to execute, but you will definitely save lots of computer horse powers. I don't have much information about your data, but the code would look something like this:
from sqlalchemy import create_engine, MetaData, Table
# MySQL database connection
source_engine = create_engine('mysql+pymysql://user:pwd@serveraddress:3306/dbname')
source_connection = engine.connect()
# Read the entire data
def read_data():
''' reads all the data and returns it row by row to save memory'''
data = source_connection.execute('SELECT * FROM table')
batch_counter = 0
batch_of_rows = []
for row in data:
batch_of_rows.append(row)
batch_counter = batch_counter + 1
if batch counter == 5000: # set this to be the batch size that optimizes your code for memory and time of execution.
batch_counter = 0
yield batch_of_rows
# close the MySQL connection
source_connection.close()
# function to transform data
def transform(data):
def process_row(row):
"""do data validation on the row"""
return row
# process and return the incoming dataset as a list of dicts
processed_data = [dict(zip(data.keys(), process_row(d)) for d in data]
return processed_data
# Postgres database connection
dest_connection = create_engine('postgresql://user:pwd@serveraddress:5432/dbname')
dest_meta = MetaData(bind=dest_connection, reflect=True, schema='test')
table = Table('table_name', self.meta, autoload=True)
for data_row in read_data():
transformed_data = transform(data)
dest_connection.execute(table.insert().values(transformed_data))
dest_connection.close()
I think that will solve your memory issues.
Note: if you need some extra explanations about yield, visit this stackoverflow question.
Upvotes: 3