Cyzanfar
Cyzanfar

Reputation: 7146

DAG does not recognize tasks Airflow

I have a file that defines the DAG object:

dags/my_dag.py

from airflow import DAG
from datetime import datetime

default_args = {
    'owner': 'pilota',
    'depends_on_past': False,
    'start_date': datetime(2019, 10, 1),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
}

bts_dag = DAG(
    'hist_data_etl', default_args=default_args, schedule_interval='@once')

Then in another file, I import the created dag and define my tasks:

from ingestion.airflow_home.dags.my_dag import bts_dag
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from ingestion.datatransformer import fetch_and_transform_bts_data_col

NUM_ENGINES = 4

template_command = '''
    ipcluster start n {{ params.cluster }}
    sleep 5
'''

start_iparallel_cluster = BashOperator(
    task_id='start_cluster',
    bash_command=template_command,
    retries=3,
    params={'params': NUM_ENGINES},
    dag=bts_dag)


import_hist_bts_data_task = PythonOperator(
    task_id='fetch_transform_hist_col',
    python_callable=fetch_and_transform_bts_data_col,
    op_kwargs={
        'bucket': 'some-bucket', 'path': 'hello/', 'num_files': 1
    },
    dag=bts_dag)

start_iparallel_cluster >> import_hist_bts_data_task

sanity check:

$ airflow list_dags

yields:

 -------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
hist_data_etl

however

$ airflow list_tasks hist_data_etl

doesn't output any of my tasks. Somehow airflow isn't registering the tasks to belong to the DAG I defined in another file.

Please help :)

Upvotes: 1

Views: 1914

Answers (1)

y2k-shubham
y2k-shubham

Reputation: 11627

  • Because of the way dag-file parsing works in Airflow, I don't expect this to work
  • Even I don't have complete idea of internals, but Airflow spawns child processes to parse dag-definition files (files identified by some traits). Each process parses a different subset of files => it is likely that different files are processed by different processes
  • I believe that in your implementation, the logical order of parsing the files (parse dag file first and then task file) is not preserved and therefore things don't work

However with some modification in your approach, you can get this working

first file

# dag_object_creator.py

from airflow import DAG
from datetime import datetime

default_args = {
    'owner': 'pilota',
    'depends_on_past': False,
    'start_date': datetime(2019, 10, 1),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
}

def create_dag_object():
    bts_dag = DAG(dag_id='hist_data_etl',
                  default_args=default_args,
                  schedule_interval='@once')
    return bts_dag

second file

# tasks_creator.py

# this import statement is problematic
# from ingestion.airflow_home.dags.my_dag import bts_dag
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from ingestion.datatransformer import fetch_and_transform_bts_data_col

NUM_ENGINES = 4

template_command = '''
    ipcluster start n {{ params.cluster }}
    sleep 5
'''

def create_bash_task(bts_dag):
    start_iparallel_cluster = BashOperator(
        task_id='start_cluster',
        bash_command=template_command,
        retries=3,
        params={'params': NUM_ENGINES},
        dag=bts_dag)
    return start_iparallel_cluster


def create_python_task(bts_dag):
    import_hist_bts_data_task = PythonOperator(
        task_id='fetch_transform_hist_col',
        python_callable=fetch_and_transform_bts_data_col,
        op_kwargs={
            'bucket': 'pilota-ml-raw-store', 'path': 'flights/', 'num_files': 1
        },
        dag=bts_dag)
    return import_hist_bts_data_task

third file

# dag_definition_file.py

import dag_object_creator
import tasks_creator

# create dag object
# stuff from 'dag_object_creator.py' can be put here directly,
# i just broke down things for clarity
bts_dag = dag_object_creator.create_dag_object()

# create tasks
start_iparallel_cluster = tasks_creator.create_bash_task(bts_dag)
import_hist_bts_data_task = tasks_creator.create_python_task(bts_dag)

# chaining tasks
start_iparallel_cluster >> import_hist_bts_data_task

above layout of code would enforce following behaviour

  • upfront process starts parsing only dag_definition_file.py (other two files are skipped because no "DAG"s are created at global scope)

  • as and when import statements are executed, these files are parsed

  • when dag / task creation statements are executed, DAG & task objects respectively get created in global scope

therefore everything comes into place nicely and this implementation should work (not tested, but based on anecdotal knowledge)


Suggested reads

Upvotes: 2

Related Questions