Reputation: 5365
I have a process which is reading from 4 databases with 4 tables each. I am consolidating that data into 1 postgres database with 4 tables total. (Each of the original 4 databases have the same 4 tables which need to be consolidated).
The way I am doing it now works using pandas. I read one table from all 4 databases at a time, concatenate the data into one dataframe, then I use to_sql to save it on my postgres database. I then loop through to the remaining databases doing the same thing for the other tables.
My issue is speed. One of my tables has about 1 - 2mil rows per date so it can take about 5,000 - 6,000 seconds to finish writing the data to postgres. It is much quicker to write it to a .csv file and then use COPY FROM in pgadmin.
Here is my current code. Note that there are some function calls but it is just referring to the table names basically. I also have some basic logging being done but that is not too necessary. I am adding a column for the source database which is required though. I am stripping .0 from fields which are actually strings but pandas sees them as a float too, and I fill blank integers with 0 and make sure the columns are really type int.
def query_database(table, table_name, query_date):
df_list = []
log_list = []
for db in ['NJ', 'NJ2', 'LA', 'NA']:
start_time = time.clock()
query_timestamp = dt.datetime.now(pytz.timezone('UTC')).strftime('%Y-%m-%d %H:%M:%S')
engine_name = '{}{}{}{}'.format(connection_type, server_name, '/', db)
print('Accessing {} from {}'.format((select_database(db)[0][table]), engine_name))
engine = create_engine(engine_name)
df = pd.read_sql_query(query.format(select_database(db)[0][table]), engine, params={query_date})
query_end = time.clock() - start_time
df['source_database'] = db
df['insert_date_utc'] = query_timestamp
df['row_count'] = df.shape[0]
df['column_count'] = df.shape[1]
df['query_time'] = round(query_end, 0)
df['maximum_id'] = df['Id'].max()
df['minimum_id'] = df['Id'].min()
df['source_table'] = table_dict.get(table)
log = df[['insert_date_utc', 'row_date', 'source_database', 'source_table', 'row_count', 'column_count', 'query_time', 'maximum_id', 'minimum_id']].copy()
df.drop(['row_count', 'column_count', 'query_time', 'maximum_id', 'minimum_id', 'source_table'], inplace=True, axis=1)
df_list.append(df)
log_list.append(log)
log = pd.concat(log_list)
log.drop_duplicates(subset=['row_date', 'source_database', 'source_table'], inplace=True, keep='last')
result = pd.concat(df_list)
result.drop_duplicates('Id', inplace=True)
cols = [i.strip() for i in (create_columns(select_database(db)[0][table]))]
result = result[cols]
print('Creating string columns for {}'.format(table_name))
for col in modify_str_cols(select_database(db)[0][table]):
create_string(result, col)
print('Creating integer columns for {}'.format(table_name))
for col in modify_int_cols(select_database(db)[0][table]):
create_int(result, col)
log.to_sql('raw_query_log', cms_dtypes.pg_engine, index=False, if_exists='append', dtype=cms_dtypes.log_dtypes)
print('Inserting {} data into PostgreSQL'.format(table_name))
result.to_sql(create_table(select_database(db)[0][table]), cms_dtypes.pg_engine, index=False, if_exists='append', chunksize=50000, dtype=create_dtypes(select_database(db)[0][table]))
How can I insert a COPY TO and COPY FROM into this to speed it up? Should I just write the .csv files and then loop over those or can I COPY from memory to my postgres?
Upvotes: 4
Views: 2359
Reputation: 26474
psycopg2 offers a number of specific copy
related apis. If you want to use csv, you have to use copy_expert
(which allows you to specify a fully copy
statement).
Normally when I have done this, I have used copy_expert()
and a file-like object which iterates through a file on disk. That seems to work reasonably well.
This being said, in your case, I think copy_to
and copy_from
are better matches because it is simply postgres to postgres transfer here. Note these use PostgreSQL's copy output/input syntax and not csv (if you want to use csv, you have to use copy_expert
)
Note before you decide how to do things, you will need to note:
copy_to
copies to a file-like object (such as StringIO
) and copy_from/copy_expert files from a file-like object. If you want to use a panda data frame you are going to have to think about this a little and either create a file-like object or use csv
along with StringIO
and copy_expert
to generate an in-memory csv and load that.
Upvotes: 4