Reputation: 11
I am trying to use Apache Airflow to create a workflow. So basically I've installed Airflow manually in my own anaconda kernel in server.
Here is the way I run a simple DAG
export AIRFLOW_HOME=~/airflow/airflow_home # my airflow home
export AIRFLOW=~/.conda/.../lib/python2.7/site-packages/airflow/bin
export PATH=~/.conda/.../bin:$AIRFLOW:$PATH # my kernel
When I do the same thing using airflow test, it worked for particular task independently. For example, in dag1: task1 >> task2
airflow test dag1 task2 2017-06-22
I suppose that it will run task1 first then run task2. But it just run task2 independently.
Do you guys have any idea about this ? Thank you very much in advance!
Here is my code:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'txuantu',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(1))
def python_op1(ds, **kwargs):
print(ds)
return 0
def python_op2(ds, **kwargs):
print(str(kwargs))
return 0
# t1, t2 and t3 are examples of tasks created by instantiating operators
# t1 = BashOperator(
# task_id='bash_operator',
# bash_command='echo {{ ds }}',
# dag=dag)
t1 = PythonOperator(
task_id='python_operator1',
python_callable=python_op1,
# provide_context=True,
dag=dag)
t2 = PythonOperator(
task_id='python_operator2',
python_callable=python_op2,
# provide_context=True,
dag=dag)
t2.set_upstream(t1)
Airflow: v1.8.0 Using executor SequentialExecutor with SQLLite
airflow run tutorial python_operator2 2015-06-01
Here is error message:
[2017-06-28 22:49:15,336] {models.py:167} INFO - Filling up the DagBag from /home/txuantu/airflow/airflow_home/dags
[2017-06-28 22:49:16,069] {base_executor.py:50} INFO - Adding to queue: airflow run tutorial python_operator2 2015-06-01T00:00:00 --mark_success --local -sd DAGS_FOLDER/tutorial.py
[2017-06-28 22:49:16,072] {sequential_executor.py:40} INFO - Executing command: airflow run tutorial python_operator2 2015-06-01T00:00:00 --mark_success --local -sd DAGS_FOLDER/tutorial.py
[2017-06-28 22:49:16,765] {models.py:167} INFO - Filling up the DagBag from /home/txuantu/airflow/airflow_home/dags/tutorial.py
[2017-06-28 22:49:16,986] {base_task_runner.py:112} INFO - Running: ['bash', '-c', u'airflow run tutorial python_operator2 2015-06-01T00:00:00 --mark_success --job_id 1 --raw -sd DAGS_FOLDER/tutorial.py']
[2017-06-28 22:49:17,373] {base_task_runner.py:95} INFO - Subtask: [2017-06-28 22:49:17,373] {__init__.py:57} INFO - Using executor SequentialExecutor
[2017-06-28 22:49:17,694] {base_task_runner.py:95} INFO - Subtask: [2017-06-28 22:49:17,693] {models.py:167} INFO - Filling up the DagBag from /home/txuantu/airflow/airflow_home/dags/tutorial.py
[2017-06-28 22:49:17,899] {base_task_runner.py:95} INFO - Subtask: [2017-06-28 22:49:17,899] {models.py:1120} INFO - Dependencies not met for <TaskInstance: tutorial.python_operator2 2015-06-01 00:00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'failed': 0, 'upstream_failed': 0, 'skipped': 0, 'done': 0}, upstream_task_ids=['python_operator1']
[2017-06-28 22:49:22,011] {jobs.py:2083} INFO - Task exited with return code 0
Upvotes: 0
Views: 8742
Reputation: 11
Finally, I found about an answer for my problem. Basically I thought airflow is lazy load, but it seems not. So the answer is instead of:
t2.set_upstream(t1)
It should be:
t1.set_downstream(t2)
Upvotes: 1
Reputation: 71
If you only want to run python_operator2, you should execute:
airflow run tutorial python_operator2 2015-06-01 --ignore_dependencies=False
If you want to execute the entire dag and execute both tasks, use trigger_dag:
airflow trigger_dag tutorial
For reference, airflow test
will "run a task without checking for dependencies."
Documentation for all three commands can be found at https://airflow.incubator.apache.org/cli.html
Upvotes: 2