dvreed77
dvreed77

Reputation: 2387

SqlAlchemy + Celery with Scoped Session error

I am trying to run a celery_beat job that kicks off a bunch of parallel jobs, but am getting the error: ResourceClosedError: This result object does not return rows. It has been closed automatically.

Here are my relevant files. Notice that I am using a scoped_session:

#db.py
engine = create_engine(SETTINGS['DATABASE_URL'], pool_recycle=3600, pool_size=10)
db_session = scoped_session(sessionmaker(
    autocommit=False, autoflush=False, bind=engine))
#tasks.py
from db import db_session
@app.task
def db_task(pid):

    db_session()
    r = db_session.query(exists().where(RSSSummary.id == pid)).scalar()

    print pid, r
    db_session.remove()


@app.task
def sched_test():
    ids =[0, 1]

    db_task.delay(ids[0])
    db_task.delay(ids[1])

And then when I try initiate sched_test, like so:

>>> tasks.sched_test.delay()

DatabaseError: (psycopg2.DatabaseError) error with status PGRES_TUPLES_OK and no message from the libpq

and

ResourceClosedError: This result object does not return rows. It has been closed automatically.

I believe I am using scoped_sessions properly.

Any suggestions?

Upvotes: 7

Views: 3585

Answers (1)

Randy Syring
Randy Syring

Reputation: 2109

I had the same error along with some errors like:

DatabaseError: server sent data ("D" message) without prior row description ("T" message)
lost synchronization with server: got message type "�", length -1244613424

DatabaseError: lost synchronization with server: got message type "0", length 842674226

It turns out that this was because my Celery worker processes were sharing the SQLAlchemy connections. SQLAlchemy docs address this:

It’s critical that when using a connection pool, and by extension when using an Engine created via create_engine(), that the pooled connections are not shared to a forked process. TCP connections are represented as file descriptors, which usually work across process boundaries, meaning this will cause concurrent access to the file descriptor on behalf of two or more entirely independent Python interpreter states.

I fixed this by using Celery events to invalidate all existing connections in the pool upon startup of the worker:

from celery.signals import worker_process_init

@worker_process_init.connect
def prep_db_pool(**kwargs):
    """
        When Celery fork's the parent process, the db engine & connection pool is included in that.
        But, the db connections should not be shared across processes, so we tell the engine
        to dispose of all existing connections, which will cause new ones to be opend in the child
        processes as needed.
        More info: https://docs.sqlalchemy.org/en/latest/core/pooling.html#using-connection-pools-with-multiprocessing
    """
    # The "with" here is for a flask app using Flask-SQLAlchemy.  If you don't 
    # have a flask app, just remove the "with" here and call .dispose()
    # on your SQLAlchemy db engine.
    with some_flask_app.app_context():
        db.engine.dispose()

Upvotes: 7

Related Questions