Reputation: 670
I am trying to use psycopg2's connection pool with python's multiprocess library.
Currently, attempting to share the connection pool amongst threads in the manner described above causes:
psycopg2.OperationalError: SSL error: decryption failed or bad record mac
The following code should reproduce the error, which the caveat that the reader has to set up a simple postgres database.
from multiprocessing import Pool
from psycopg2 import pool
import psycopg2
import psycopg2.extras
connection_pool = pool.ThreadedConnectionPool(1, 200, database='postgres',
user='postgres', password='postgres', host='localhost')
class ConnectionFromPool:
"""
Class to establish a connection with the local PostgreSQL database
To use:
query = SELECT * FROM ticker_metadata
with ConnectionFromPool() as cursor:
cursor.execute(query)
results = cursor.fetchall()
Returns:
Arrayed Dictionary of results
[{...},{...},{...}]
"""
def __init__(self):
self.connection_pool = None
self.cursor = None
self.connection = None
def __enter__(self):
self.connection = connection_pool.getconn()
self.cursor = self.connection.cursor(
cursor_factory=psycopg2.extras.RealDictCursor)
return self.cursor
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_val is not None:
self.connection.rollback()
else:
self.cursor.close()
self.connection.commit()
connection_pool.putconn(self.connection)
def test_query(col_attribute):
"""
Simple SQL query
"""
query = f"""SELECT *
FROM col
WHERE col = {col_attribute}
;"""
with ConnectionFromPool() as cursor:
cursor.execute(query)
result = cursor.fetchall()
return result
def multiprocessing(func, args, n_workers=2):
"""spawns multiple processes
Args:
func: function, to be performed
args: list of args to be passed to each call of func
n_workers: number of processes to be spawned
Return:
A list, containing the results of each proccess
"""
with Pool(processes=n_workers) as executor:
res = executor.starmap(func, args)
return list(res)
def main():
args = [[i] for i in range(1000)]
results = multiprocessing(test_query, args, 2)
if __name__ == "__main__":
main()
What I have already tried:
with
statement in the sql query. This throws an error claiming that the connection object is not pickle-able.Note: If I put a sleep
operation in all but one of the processes, the non-sleeping processes runs fine and executes its query, until the remaining threads un-sleep, then I get the above error.
What I have already read:
Finally:
How can I use a connection pool (psycopg2) with python's multiprocess (multiprocessing). I am open to using other libraries so long as they work with python and postgresql databases.
Upvotes: 6
Views: 3996
Reputation: 670
Here is my solution. The solution can be stated in 2 parts:
test_query
)In more detail with reference to the example in the question:
Create the wrapper function that will be re-using one connection pool per Process:
def multi_query(list_of_cols):
# create a new connection pool per Process
new_pool = new_connection_pool()
# Pass the pool to each query
for col in list_of_cols:
test_query(col, new_pool)
Modify the query function to accept a connection pool:
Old test_query
:
def test_query(col_attribute):
"""
Simple SQL query
"""
query = f"""SELECT *
FROM col
WHERE col = {col_attribute}
;"""
with ConnectionFromPool() as cursor:
cursor.execute(query)
result = cursor.fetchall()
return result
New test_query
:
def test_query(col_attribute, connection_pool=None):
"""
Simple SQL query
"""
query = f"""SELECT *
FROM col
WHERE col = {col_attribute}
;"""
with ConnectionFromPool(connection_pool) as cursor:
cursor.execute(query)
result = cursor.fetchall()
return result
Upvotes: 1