Reputation: 1314
I am relatively new to python multiprocessing and struggling with a lot of problems related to this topic. My newest problem is the combination from multiprocessing, sqlalchemy and postgres. With this combination i sometimes got an
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: decryption failed or bad record mac
After research i found this hint in the documentation:
https://docs.sqlalchemy.org/en/13/core/pooling.html "It’s critical that when using a connection pool, and by extension when using an Engine created via create_engine(), that the pooled connections are not shared to a forked process. TCP connections are represented as file descriptors, which usually work across process boundaries, meaning this will cause concurrent access to the file descriptor on behalf of two or more entirely independent Python interpreter states.
There are two approaches to dealing with this.
The first is, either create a new Engine within the child process, or upon an existing Engine, call Engine.dispose() before the child process uses any connections. This will remove all existing connections from the pool so that it makes all new ones. "
and this:
uWSGI, Flask, sqlalchemy, and postgres: SSL error: decryption failed or bad record mac "The issue ended up being uwsgi's forking.
When working with multiple processes with a master process, uwsgi initializes the application in the master process and then copies the application over to each worker process. The problem is if you open a database connection when initializing your application, you then have multiple processes sharing the same connection, which causes the error above."
My Interpretation is that when using multiprocessing i have to assure that every process uses a new Engine. In my child-processes there is only one class which reads and writes to the postgres-db so I decided to define an slqalchemy engine inside the class:
class WS_DB_Booker():
def __init__(self):
engine_inside_class = create_engine(botpak.bas.dontgitp.bot_engine_string)
Base_inside_class = declarative_base()
Base_inside_class.metadata.create_all(engine_inside_class)
session_factory_inside_class = sessionmaker(bind=engine_inside_class)
self.DBSession_inside_class = scoped_session(session_factory_inside_class)
def example_method_to_read_from_db(self):
try:
sql_alc_session = self.DBSession_inside_class()
sql_alc_session.query(and_so_on....
This works fine without any problems in the first trials. But i am not sure is this a proper way to define the engine inside a class or can this lead to any problems?
Upvotes: 4
Views: 980
Reputation: 21582
It is not really understandable how you fork your processes or which component does the forking.
What you need to make sure is that you instantiate the WS_DB_Broker
class after the forking!
If you do it the wrong way (instantiate before fork) then the Engine
may already have references to some dbapi
connections in it's Pool
. See the SQLAlchemy documentation on working with engines for this.
In order to make your error more apparent you may do something like:
import os
class WS_DB_Booker():
def __init__(self):
# Remember the process id from the time of instantiation. If the
# interpreter is forked then the output of `os.getpid()` will change.
self._pid = os.getpid()
engine_inside_class = create_engine(botpak.bas.dontgitp.bot_engine_string)
Base_inside_class = declarative_base()
Base_inside_class.metadata.create_all(engine_inside_class)
session_factory_inside_class = sessionmaker(bind=engine_inside_class)
self._session = scoped_session(session_factory_inside_class)
def get_session():
if self._pid != os.getpid():
raise RuntimeError("Forked after instantiating! Please fix!")
return self._session()
def example_method_to_read_from_db(self):
try:
sql_alc_session = self.get_session()
# ^^^^^^^^^^^^^
# this may throw RuntimeError when used incorrectly, thus saving you
# from your own mistake.
sql_alc_session.query(and_so_on....
Upvotes: 3