Reputation: 7146
I have a file that defines the DAG object:
dags/my_dag.py
from airflow import DAG
from datetime import datetime
default_args = {
'owner': 'pilota',
'depends_on_past': False,
'start_date': datetime(2019, 10, 1),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
}
bts_dag = DAG(
'hist_data_etl', default_args=default_args, schedule_interval='@once')
Then in another file, I import the created dag and define my tasks:
from ingestion.airflow_home.dags.my_dag import bts_dag
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from ingestion.datatransformer import fetch_and_transform_bts_data_col
NUM_ENGINES = 4
template_command = '''
ipcluster start n {{ params.cluster }}
sleep 5
'''
start_iparallel_cluster = BashOperator(
task_id='start_cluster',
bash_command=template_command,
retries=3,
params={'params': NUM_ENGINES},
dag=bts_dag)
import_hist_bts_data_task = PythonOperator(
task_id='fetch_transform_hist_col',
python_callable=fetch_and_transform_bts_data_col,
op_kwargs={
'bucket': 'some-bucket', 'path': 'hello/', 'num_files': 1
},
dag=bts_dag)
start_iparallel_cluster >> import_hist_bts_data_task
sanity check:
$ airflow list_dags
yields:
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
hist_data_etl
however
$ airflow list_tasks hist_data_etl
doesn't output any of my tasks. Somehow airflow isn't registering the tasks to belong to the DAG I defined in another file.
Please help :)
Upvotes: 1
Views: 1914
Reputation: 11627
However with some modification in your approach, you can get this working
first file
# dag_object_creator.py
from airflow import DAG
from datetime import datetime
default_args = {
'owner': 'pilota',
'depends_on_past': False,
'start_date': datetime(2019, 10, 1),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
}
def create_dag_object():
bts_dag = DAG(dag_id='hist_data_etl',
default_args=default_args,
schedule_interval='@once')
return bts_dag
second file
# tasks_creator.py
# this import statement is problematic
# from ingestion.airflow_home.dags.my_dag import bts_dag
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from ingestion.datatransformer import fetch_and_transform_bts_data_col
NUM_ENGINES = 4
template_command = '''
ipcluster start n {{ params.cluster }}
sleep 5
'''
def create_bash_task(bts_dag):
start_iparallel_cluster = BashOperator(
task_id='start_cluster',
bash_command=template_command,
retries=3,
params={'params': NUM_ENGINES},
dag=bts_dag)
return start_iparallel_cluster
def create_python_task(bts_dag):
import_hist_bts_data_task = PythonOperator(
task_id='fetch_transform_hist_col',
python_callable=fetch_and_transform_bts_data_col,
op_kwargs={
'bucket': 'pilota-ml-raw-store', 'path': 'flights/', 'num_files': 1
},
dag=bts_dag)
return import_hist_bts_data_task
third file
# dag_definition_file.py
import dag_object_creator
import tasks_creator
# create dag object
# stuff from 'dag_object_creator.py' can be put here directly,
# i just broke down things for clarity
bts_dag = dag_object_creator.create_dag_object()
# create tasks
start_iparallel_cluster = tasks_creator.create_bash_task(bts_dag)
import_hist_bts_data_task = tasks_creator.create_python_task(bts_dag)
# chaining tasks
start_iparallel_cluster >> import_hist_bts_data_task
above layout of code would enforce following behaviour
upfront process starts parsing only dag_definition_file.py
(other two files are skipped because no "DAG"s are created at global scope)
as and when import
statements are executed, these files are parsed
when dag / task creation statements are executed, DAG & task objects respectively get created in global scope
therefore everything comes into place nicely and this implementation should work (not tested, but based on anecdotal knowledge)
Suggested reads
Upvotes: 2