Xuan-Tu Tran
Xuan-Tu Tran

Reputation: 11

Airflow cannot run DAG because of upstream tasks been failed

I am trying to use Apache Airflow to create a workflow. So basically I've installed Airflow manually in my own anaconda kernel in server.

Here is the way I run a simple DAG

export AIRFLOW_HOME=~/airflow/airflow_home # my airflow home
export AIRFLOW=~/.conda/.../lib/python2.7/site-packages/airflow/bin
export PATH=~/.conda/.../bin:$AIRFLOW:$PATH # my kernel

When I do the same thing using airflow test, it worked for particular task independently. For example, in dag1: task1 >> task2

airflow test dag1 task2 2017-06-22

I suppose that it will run task1 first then run task2. But it just run task2 independently.

Do you guys have any idea about this ? Thank you very much in advance!

Here is my code:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'txuantu',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(1))


def python_op1(ds, **kwargs):
    print(ds)
    return 0


def python_op2(ds, **kwargs):
    print(str(kwargs))
    return 0

# t1, t2 and t3 are examples of tasks created by instantiating operators
# t1 = BashOperator(
#     task_id='bash_operator',
#     bash_command='echo {{ ds }}',
#     dag=dag)
t1 = PythonOperator(
    task_id='python_operator1',
    python_callable=python_op1,
    # provide_context=True,
    dag=dag)


t2 = PythonOperator(
    task_id='python_operator2',
    python_callable=python_op2,
    # provide_context=True,
    dag=dag)

t2.set_upstream(t1)

Airflow: v1.8.0 Using executor SequentialExecutor with SQLLite

airflow run tutorial python_operator2 2015-06-01

Here is error message:

[2017-06-28 22:49:15,336] {models.py:167} INFO - Filling up the DagBag from /home/txuantu/airflow/airflow_home/dags
[2017-06-28 22:49:16,069] {base_executor.py:50} INFO - Adding to queue: airflow run tutorial python_operator2 2015-06-01T00:00:00 --mark_success --local -sd DAGS_FOLDER/tutorial.py
[2017-06-28 22:49:16,072] {sequential_executor.py:40} INFO - Executing command: airflow run tutorial python_operator2 2015-06-01T00:00:00 --mark_success --local -sd DAGS_FOLDER/tutorial.py
[2017-06-28 22:49:16,765] {models.py:167} INFO - Filling up the DagBag from /home/txuantu/airflow/airflow_home/dags/tutorial.py
[2017-06-28 22:49:16,986] {base_task_runner.py:112} INFO - Running: ['bash', '-c', u'airflow run tutorial python_operator2 2015-06-01T00:00:00 --mark_success --job_id 1 --raw -sd DAGS_FOLDER/tutorial.py']
[2017-06-28 22:49:17,373] {base_task_runner.py:95} INFO - Subtask: [2017-06-28 22:49:17,373] {__init__.py:57} INFO - Using executor SequentialExecutor
[2017-06-28 22:49:17,694] {base_task_runner.py:95} INFO - Subtask: [2017-06-28 22:49:17,693] {models.py:167} INFO - Filling up the DagBag from /home/txuantu/airflow/airflow_home/dags/tutorial.py
[2017-06-28 22:49:17,899] {base_task_runner.py:95} INFO - Subtask: [2017-06-28 22:49:17,899] {models.py:1120} INFO - Dependencies not met for <TaskInstance: tutorial.python_operator2 2015-06-01 00:00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'failed': 0, 'upstream_failed': 0, 'skipped': 0, 'done': 0}, upstream_task_ids=['python_operator1']
[2017-06-28 22:49:22,011] {jobs.py:2083} INFO - Task exited with return code 0

Upvotes: 0

Views: 8742

Answers (2)

Xuan-Tu Tran
Xuan-Tu Tran

Reputation: 11

Finally, I found about an answer for my problem. Basically I thought airflow is lazy load, but it seems not. So the answer is instead of:

t2.set_upstream(t1)

It should be:

t1.set_downstream(t2)

Upvotes: 1

Anthony Lee
Anthony Lee

Reputation: 71

If you only want to run python_operator2, you should execute:

airflow run tutorial python_operator2 2015-06-01 --ignore_dependencies=False

If you want to execute the entire dag and execute both tasks, use trigger_dag:

airflow trigger_dag tutorial

For reference, airflow test will "run a task without checking for dependencies."

Documentation for all three commands can be found at https://airflow.incubator.apache.org/cli.html

Upvotes: 2

Related Questions