Reputation: 13
I am writing a DAG with a BranchPythonOperator to check whether or not data is available for download. If the data is there, the DAG should download and incorporate it into my PostgreSQL database. If it isn't there, all the processing tasks should be skipped and the branch should go to a DummyOperator. Unfortunately the DAG is not skipping all the tasks. It will skip up to 6 tasks, but then stops (the downstream tasks have an unknown status) and the DAG fails. I am not finding any error messages in the logs (because tasks are not failing).
Airflow version 1.8.1. I attached some screenshots below. In the following DAG example, I replaced sensitive file info with 'XXXXX'. I have also tried the ShortCircuitOperator, but only got it to skip the task directly downstream from the SCO.
Thank you!
from airflow import DAG
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.email_operator import EmailOperator
from datetime import datetime, timedelta
default_args = {'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 2),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG('Question_DAG',
template_searchpath="/home/airflow/code",
schedule_interval='0 10 * * *',
catchup=True,
default_args=default_args
)
data_check = SSHExecuteOperator(task_id='check_for_new_data_and_download',
ssh_hook=SSHHook(conn_id='server'),
bash_command='download_file.sh',
xcom_push=True,
dag=dag)
def identify_new_data(**kwargs):
''' If DATA has not been uploaded, the DAG does not continue.'''
new_data_code = kwargs['task_instance'].xcom_pull(task_ids='check_for_new_data_and_download', key=None)
filecount_type_conversion_success = True
try:
new_data_code = int(new_data_code)
except ValueError:
filecount_type_conversion_success = False
# print new_data_code, type(new_data_code)
# 1 means there is new data, therefore it should update the data tables.
# 2 means source data was not uploaded
if new_data_code == 1:
return 'data_uploaded'
elif new_data_code == 2 or new_data_code == 3:
return 'no_data_uploaded'
identify_new_data = BranchPythonOperator(task_id='identify_new_data',
python_callable=identify_new_data,
trigger_rule="all_done",
provide_context=True,
dag=dag)
no_data_uploaded = DummyOperator(task_id="no_data_uploaded",
trigger_rule='all_done',
dag=dag)
data_uploaded = EmailOperator(task_id='data_uploaded',
to='myemail@google',
subject='File Downloaded',
html_content='Hello, This is an auto-generated email to inform you that the montly data has as been downloaded. Thank you.',
dag=dag)
################# create_raw_table ################################
create_raw_table = PostgresOperator(task_id='create_raw_table',
postgres_conn_id='warehouse',
sql='create_raw_table.sql',
dag=dag)
################# Convert fixed width file to csv ################################
convert_fixed_width_csv = SSHExecuteOperator(task_id='convert_fixed_width_csv',
ssh_hook=SSHHook(conn_id='server'),
bash_command='convert_fixed_width_csv.sh',
dag=dag)
################# Dedupe ##############
dedupe_on_id = PostgresOperator(task_id='dedupe_on_id',
postgres_conn_id='warehouse',
sql='dedupe.sql',
dag=dag)
################# Date Insert ################################
date_insert = PostgresOperator(task_id='add_dates_raw',
postgres_conn_id='warehouse',
sql='add_dates.sql',
dag=dag)
################ Client Insert ###########################
client_insert = PostgresOperator(task_id='client_insert',
postgres_conn_id='warehouse',
sql='client_insert.sql',
dag=dag)
################# Months Insert ###########################
months_insert = PostgresOperator(task_id='months_insert',
postgres_conn_id='warehouse',
sql='months_insert.sql',
dag=dag)
################# Eligibility Insert ######################
eligibility_insert = PostgresOperator(task_id='eligibility_insert',
postgres_conn_id='warehouse',
sql='eligibility_insert.sql',
dag=dag)
################# Plan Insert ####################
plan_insert = PostgresOperator(task_id='plan_insert',
postgres_conn_id='warehouse',
sql='plan_insert.sql',
dag=dag)
################# Codes ###################################
codes = PostgresOperator(task_id='codes',
postgres_conn_id='warehouse',
sql='codes.sql',
dag=dag)
################# Update Dates ################################
update_dates = PostgresOperator(task_id='update_dates',
postgres_conn_id='warehouse',
sql='update_dates.sql',
dag=dag)
################# Clients ################################
create_clients = PostgresOperator(task_id='create_clients',
postgres_conn_id='warehouse',
sql='clients.sql',
dag=dag)
################# fix_addresses ############
fix_addresses = SSHExecuteOperator(task_id='fix_addresses',
ssh_hook=SSHHook(conn_id='server'),
bash_command='fix_addresses.sh',
dag=dag)
################# Load data ############
load_data_command = """
cd data/
TASKDATE='date +%Y%m'
cp XXXX.TXT /home/admin/data/XXX_loaded/XXX.TXT
"""
load_data = SSHExecuteOperator(task_id='load_data',
ssh_hook=SSHHook(conn_id='server'),
bash_command=load_data_command,
dag=dag)
################# Update system status ################################
system_status = PostgresOperator(task_id='update_system_status',
postgres_conn_id='warehouse',
sql="SELECT update_system_status('new_info')",
dag=dag)
data_check.set_downstream(identify_new_data)
identify_new_data.set_downstream(data_uploaded)
data_uploaded.set_downstream(create_raw_table)
create_raw_table.set_downstream(convert_fixed_width_csv)
convert_fixed_width_csv.set_downstream(dedupe_on_id)
dedupe_on_id.set_downstream(date_insert)
date_insert.set_downstream(client_insert)
client_insert.set_downstream(months_insert)
months_insert.set_downstream(eligibility_insert)
eligibility_insert.set_downstream(plan_insert)
plan_insert.set_downstream(codes)
codes.set_downstream(update_dates)
update_dates.set_downstream(create_clients)
create_clients.set_downstream(fix_addresses)
fix_addresses.set_downstream(load_data)
load_data.set_downstream(system_status)
The attached screenshot shows the Tree View on the Airflow UI and I was trying to troubleshoot that certain tasks were not making it fail.
Upvotes: 1
Views: 3166
Reputation: 6548
I believe you're running into the same issue described in AIRFLOW-1296. A fix was made for it in Airflow 1.8.2 so I would upgrade and see if you can reproduce it still. It worked for me, but as seen in the comments, there were some mix results.
Upvotes: 1