Reputation: 364
We have an Airflow instance running in AWS Fargate. It connects to an on-premise Postgres server (on Windows) and tries to load data from a (complicated) view. It uses a PostgresHook
for that. However, the task in the DAG fails in Airflow with this error:
File "/usr/local/lib/python3.7/site-packages/airflow/hooks/dbapi_hook.py", line 120, in get_records
cur.execute(sql)
psycopg2.OperationalError: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
A while ago, the error occurred after some 10-15 minutes. Now, it occurs faster, after 5 minutes or even faster.
I have looked in the Postgres logs, that shows (confusingly) that it was the client that closed the connection:
LOG: could not send data to client: An existing connection was forcibly closed by the remote host.
FATAL: connection to client lost
I have tried a bunch of potential solutions already.
Without Airflow
Connnecting to the server outside of Airflow, using psycopg2
directly: works (using the complicated view).
Different table Trying to load data from a different table from Airflow in the cloud: works, finishes quickly too. So this "timeout" only occurs because the query takes a while.
Running the Airflow container locally
At first I could reproduce this issue, but I (think I) solved it by adding some extra parameters in the postgres connection string: keepalives=1&keepalives_idle=60&keepalives_interval=60
. However, I cannot reproduce this fix in the Airflow in the cloud, because when I add these parameters there, the error remains.
Increase timeouts
See above, I added keepalives, but I also tried to reason about other potential timeouts. I added a timeout execution_timeout
to the DAG arguments, to no avail. We also checked networking timeouts, but given the irregular pattern of the connection failures, it doesn't really sound like such a hard timeout...
I am at a loss here. Any suggestions?
Upvotes: 2
Views: 11188
Reputation: 364
Update: we have solved this problem through a workaround. Instead of keeping the connection open while the complex view is being queried, we have turned the connection into an asynchronous connection (i.e., aconn = psycopg2.connect(database='test', async=1)
from psycopg docs). Furthermore, we have turned the view into a materialized view, such that we only call a REFRESH MATERIALIZED VIEW
through the asynchronous connection, and then we can just SELECT *
on the materialized view a while later, which is very fast.
Upvotes: 1