Reputation: 537
I have the following code in Python, which will execute 5 queries in a row. The average runtime per query is around 181.1 seconds (~3 minutes), and the total runtime for all 5 queries is 905.4 seconds (~15 minutes). Eventually, after loading the data into DataFrames, I am going to perform ETL work (mostly looking for errors, data quality issues, and inconsistencies), but before that I want to try to leverage multiprocessing to shave off on runtime. I am not familiar with multiprocessing in Python, so I have been reading about different methodologies (Queue vs Pooling, etc.). I am curious which method would run best for this workflow, and how would I implement this? Ideally, a multiprocessed-translated version of this code, or a guide to get there would be great.
Thank you.
EDIT: in case I wasn’t clear, I want to run all 5 queries concurrently. Something that may be an issue is appending each DataFrame to the list concurrently, so if I need to I am willing to forgo that.
import pandas as pd
import psycopg2
import time
import os
host = os.environ["DBHOST"]
user = os.environ["DBUSER"]
pass = os.environ["DBPWD"]
db_conn = psycopg2.connect("host='{}' port={} dbname='{}' user={} password={}".format(host,
port#,
"db_name",
user,
pass))
query_load = [("SELECT column_name_1, COUNT(*) "
"FROM schema.table "
"GROUP BY column_name_1 "
"ORDER BY column_name_1 ASC"),
("SELECT column_name_2, COUNT(*) "
"FROM schema.table "
"GROUP BY column_name_2 "
"ORDER BY column_name_2 ASC"),
("SELECT column_name_3, COUNT(*) "
"FROM schema.table "
"GROUP BY column_name_3 "
"ORDER BY column_name_3 ASC"),
("SELECT column_name_4, COUNT(*) "
"FROM schema.table "
"GROUP BY column_name_4 "
"ORDER BY column_name_4 ASC"),
("SELECT column_name_5, COUNT(*) "
"FROM schema.table "
"GROUP BY column_name_5 "
"ORDER BY column_name_5 ASC")]
start_time = time.time()
data_load = []
for queries in query_load:
data_load.append(pd.read_sql(queries, db_conn))
elapsed_time = time.time() - start_time
print ("Job finished in {} seconds".format(elapsed_time))
Upvotes: 0
Views: 4771
Reputation: 57
(This is better as a comment to the accepted answer, but I don't have enough reputation for that)
From the psycopg2 documentation: https://www.psycopg.org/docs/usage.html#thread-safety
libpq connections shouldn’t be used by a forked processes, so when using a module such as multiprocessing or a forking web deploy method such as FastCGI make sure to create the connections after the fork.
So passing connection object to a pool is not safe, and instead a new connection should be created by each process.
Upvotes: 1
Reputation: 39374
Since you already have a collection of queries, we can organise a function to take one at a time, but by using Pool.map
, they can run concurrently:
from multiprocessing import Pool
import pandas as pd
import time
# define query_load
# define db_conn
def read_sql(query):
return pd.read_sql(query, db_conn)
if __name__ == '__main__':
start_time = time.time()
with Pool(5) as p:
data_load = p.map(read_sql, query_load)
elapsed_time = time.time() - start_time
print ("Job finished in {} seconds".format(elapsed_time))
# carry on re-processing data_load
Now, I'm assuming that db_conn
will allow concurrent requests.
Also note that p.map
does the organising of taking results and loading them into a list
for you.
Upvotes: 2