Reputation: 5157
The code below works fine when I run it in the Terminal:
data = []
def _step_one():
for i in range(10):
data.append(i)
def _step_two():
print(f'data: {data}')
_step_one()
_step_two()
It prints data: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
But when I run the same on Apache Airflow data
is empty []
as first initialized
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
data = []
def _step_one():
for i in range(10):
data.append(i)
def _step_two():
print(f'data: {data}')
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
dag_id='test_dag',
default_args=default_args,
description='Test DAG',
schedule_interval=timedelta(minutes=3)
)
step_one = PythonOperator(
task_id='step_one',
python_callable=_step_one,
dag=dag
)
step_two = PythonOperator(
task_id='step_two',
python_callable=_step_two,
dag=dag
)
final_step = BashOperator(
task_id='notify',
bash_command='echo "Operation Completed!"',
dag=dag
)
step_one >> step_two >> final_step
I'm new to Python so I may be committing a newbie mistake, any ideas what I'm doing wrong?
Upvotes: 0
Views: 49
Reputation: 18824
You can not pass data between tasks in that way.
As your tasks can run on a different machine they do not shares the variables. If you want to pass data between tasks, you should use XCom:
https://airflow.apache.org/docs/apache-airflow/2.0.1/concepts.html#xcoms
Upvotes: 1