Reputation: 317
I setup Airflow 2.0 on a local machine running Win 10 using Ubuntu. I use PostgreSQL as database, CeleryExecutor and RabbitMQ as Celery backend. I created some DAGs, every DAG connects to Redshift database through a SSH tunnel and execute a SQL command. Each DAG runs smoothly when I trigger manually or run via scheduler.
However, I encounter an error when I set schedule for these DAGs starting running at the same time. For example, if DAG1 and DAG2 start running at 8:00 AM, these 2 dags will fail and show below error:
psycopg2.OperationalError: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request.
If I set these 2 dags starting at different time, everything runs smoothly. Also, if I combine 2 dags into 1 dag with 2 tasks, this combined dag runs well.
This is my DAG code, it's the same for every dag (just different SQL commands):
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
import time
dag = DAG('test', description='Simple test tutorial DAG',
schedule_interval= None,
start_date=datetime(2021, 1, 6), tags = ['test'])
def select_from_tunnel_db():
# Open SSH tunnel
ssh_hook = SSHHook(ssh_conn_id='dw_ssh')
tunnel = ssh_hook.get_tunnel(remote_port = 5439, remote_host='**.**.**.**', local_port=5439)
tunnel.start()
# Connect to DB and run query
pg_hook = PostgresHook(
postgres_conn_id='dw', # NOTE: host='localhost'
schema='test'
)
conn = pg_hook.get_conn()
cursor = conn.cursor()
cursor.execute('''
insert into abc values (1, 'a')
''')
cursor.close()
conn.commit()
conn.close()
python_operator = PythonOperator(
task_id='test_tunnel_conn',
python_callable=select_from_tunnel_db,
)
Upvotes: 0
Views: 2045
Reputation: 317
I found the solution, so I come back to update it. Hopefully it can be useful.
Tunnel has a timeout interval (I don't know default value exactly, but pretty sure that it's less than 1 second), so we need to set it larger. Add 1 more line of code after creating your tunnel:
sshtunnel.SSH_TIMEOUT = sshtunnel.TUNNEL_TIMEOUT = 5.0
Upvotes: 1