Tom Hood
Tom Hood

Reputation: 537

Python using multiprocessing on PostgreSQL queries reduce runtime

I have the following code in Python, which will execute 5 queries in a row. The average runtime per query is around 181.1 seconds (~3 minutes), and the total runtime for all 5 queries is 905.4 seconds (~15 minutes). Eventually, after loading the data into DataFrames, I am going to perform ETL work (mostly looking for errors, data quality issues, and inconsistencies), but before that I want to try to leverage multiprocessing to shave off on runtime. I am not familiar with multiprocessing in Python, so I have been reading about different methodologies (Queue vs Pooling, etc.). I am curious which method would run best for this workflow, and how would I implement this? Ideally, a multiprocessed-translated version of this code, or a guide to get there would be great.

Thank you.

EDIT: in case I wasn’t clear, I want to run all 5 queries concurrently. Something that may be an issue is appending each DataFrame to the list concurrently, so if I need to I am willing to forgo that.

import pandas as pd
import psycopg2
import time
import os

host = os.environ["DBHOST"]
user = os.environ["DBUSER"]
pass = os.environ["DBPWD"]

db_conn = psycopg2.connect("host='{}' port={} dbname='{}' user={} password={}".format(host, 
                                                                                           port#, 
                                                                                           "db_name", 
                                                                                           user, 
                                                                                           pass))
query_load = [("SELECT column_name_1, COUNT(*) "
            "FROM schema.table "
            "GROUP BY column_name_1 "
            "ORDER BY column_name_1 ASC"),

             ("SELECT column_name_2, COUNT(*) "
            "FROM schema.table "
            "GROUP BY column_name_2 "
            "ORDER BY column_name_2 ASC"),

             ("SELECT column_name_3, COUNT(*) "
            "FROM schema.table "
            "GROUP BY column_name_3 "
            "ORDER BY column_name_3 ASC"),

             ("SELECT column_name_4, COUNT(*) "
            "FROM schema.table "
            "GROUP BY column_name_4 "
            "ORDER BY column_name_4 ASC"),

            ("SELECT column_name_5, COUNT(*) "
            "FROM schema.table "
            "GROUP BY column_name_5 "
            "ORDER BY column_name_5 ASC")]
start_time = time.time()
data_load = []
for queries in query_load:
    data_load.append(pd.read_sql(queries, db_conn))
elapsed_time = time.time() - start_time
print ("Job finished in {} seconds".format(elapsed_time))

Upvotes: 0

Views: 4771

Answers (2)

Arpit Saxena
Arpit Saxena

Reputation: 57

(This is better as a comment to the accepted answer, but I don't have enough reputation for that)

From the psycopg2 documentation: https://www.psycopg.org/docs/usage.html#thread-safety

libpq connections shouldn’t be used by a forked processes, so when using a module such as multiprocessing or a forking web deploy method such as FastCGI make sure to create the connections after the fork.

So passing connection object to a pool is not safe, and instead a new connection should be created by each process.

Upvotes: 1

quamrana
quamrana

Reputation: 39374

Since you already have a collection of queries, we can organise a function to take one at a time, but by using Pool.map, they can run concurrently:

from multiprocessing import Pool
import pandas as pd
import time

# define query_load
# define db_conn

def read_sql(query):
    return pd.read_sql(query, db_conn)

if __name__ == '__main__':
    start_time = time.time()
    with Pool(5) as p:
        data_load = p.map(read_sql, query_load)
    elapsed_time = time.time() - start_time
    print ("Job finished in {} seconds".format(elapsed_time))
    # carry on re-processing data_load

Now, I'm assuming that db_conn will allow concurrent requests.

Also note that p.map does the organising of taking results and loading them into a list for you.

Upvotes: 2

Related Questions