dark horse
dark horse

Reputation: 3719

Airflow - Broken DAG - Timeout

I have a DAG that executes a function that connects to a Postgres DB, deletes the contents in the table and then inserts a new data set.

I am trying this in my local and I see when I try to run this, the web server takes a long time to connect and in most cases doesn't succeed. However as part of the connecting process it seems to be executing the queries from the back-end. Since I have a delete function I see the data getting deleted from the table(basically one of the functions gets executed) even though I have not scheduled the script or manually started. Could someone advice as to what I am doing wrong in this.

One error that pops out in the UI is

Broken DAG: [/Users/user/airflow/dags/dwh_sample23.py] Timeout

Also see an i next to the dag id in the UI that says This is DAG isn't available in the web server's DAG object. Given below is the code I am using:

## Third party Library Imports
import pandas as pd
import psycopg2
import airflow
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io


# Following are defaults which can be overridden later on
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': datetime(2018, 5, 21),
'retries': 1,
'retry_delay': timedelta(minutes=1),
}

dag = DAG('dwh_sample23', default_args=default_args)


#######################
## Login to DB

def db_login():
    ''' This function connects to the Data Warehouse and returns the cursor to execute queries '''
global dwh_connection
try:
    dwh_connection = psycopg2.connect(" dbname = 'dbname' user = 'user' password = 'password' host = 'hostname' port = '5439' sslmode = 'require' ")
except:
    print("I am unable to connect to the database.")
print('Success')
return(dwh_connection)

def tbl1_del():
''' This function takes clears all rows from tbl1 '''
cur = dwh_connection.cursor()
cur.execute("""DELETE FROM tbl1;""")
dwh_connection.commit()


def pop_tbl1():
''' This function populates all rows in tbl1 '''
cur = dwh_connection.cursor()
cur.execute(""" INSERT INTO tbl1
select id,name,price from tbl2;""")
dwh_connection.commit()



db_login()
tbl1_del()
pop_tbl1()
dwh_connection.close()

##########################################


t1 = BashOperator(
task_id='DB_Connect',
python_callable=db_login(),
bash_command='python3 ~/airflow/dags/dwh_sample23.py',
dag=dag)

t2 = BashOperator(
task_id='del',
python_callable=tbl1_del(),
bash_command='python3 ~/airflow/dags/dwh_sample23.py',
dag=dag)


t3 = BashOperator(
task_id='populate',
python_callable=pop_tbl1(),
bash_command='python3 ~/airflow/dags/dwh_sample23.py',
dag=dag)


t1.set_downstream(t2)
t2.set_downstream(t3)

Could anyone assist? Thanks.

Upvotes: 10

Views: 18655

Answers (2)

Barak1731475
Barak1731475

Reputation: 779

This is really old by now, but we got this error in prod and I found this question, and think its nice that it would have an answer.

Some of the code is getting executed during DAG load, i.e. you actually run

db_login()
tbl1_del()
pop_tbl1()
dwh_connection.close()
##########################################

inside webserver and scheduler loop, when they load dag definition from the file. I believe you didn't intend that to happen. Everything should work just fine if you just remove these 4 lines.

Generally don't place function you want executors to execute on file/module level, because when interpreter of the scheduler/webserver loads the file to get dag definition, it would invoke them.

Just try putting this in your dag file and see check webserver logs to see what happens.

from time import sleep
def do_some_printing():
    print(1111111)
    sleep(60)

do_some_printing()

Upvotes: 1

gruby
gruby

Reputation: 990

Instead of using BashOperator you can use PythonOperator and call db_login(), tbl1_del(), pop_tbl1() in PythonOperator

## Third party Library Imports
import pandas as pd
import psycopg2
import airflow
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io


# Following are defaults which can be overridden later on
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': datetime(2018, 5, 21),
'retries': 1,
'retry_delay': timedelta(minutes=1),
}

dag = DAG('dwh_sample23', default_args=default_args)


#######################
## Login to DB

def db_login():
    ''' This function connects to the Data Warehouse and returns the cursor to execute queries '''
global dwh_connection
try:
    dwh_connection = psycopg2.connect(" dbname = 'dbname' user = 'user' password = 'password' host = 'hostname' port = '5439' sslmode = 'require' ")
except:
    print("I am unable to connect to the database.")
print('Success')
return(dwh_connection)

def tbl1_del():
''' This function takes clears all rows from tbl1 '''
cur = dwh_connection.cursor()
cur.execute("""DELETE FROM tbl1;""")
dwh_connection.commit()


def pop_tbl1():
''' This function populates all rows in tbl1 '''
cur = dwh_connection.cursor()
cur.execute(""" INSERT INTO tbl1
select id,name,price from tbl2;""")
dwh_connection.commit()



db_login()
tbl1_del()
pop_tbl1()
dwh_connection.close()

##########################################


t1 = PythonOperator(
task_id='DB_Connect',
python_callable=db_login(),
dag=dag)

t2 = PythonOperator(
task_id='del',
python_callable=tbl1_del(),
dag=dag)


t3 = PythonOperator(
task_id='populate',
python_callable=pop_tbl1(),
dag=dag)


t1.set_downstream(t2)
t2.set_downstream(t3)

Upvotes: 1

Related Questions