dchen71
dchen71

Reputation: 170

Dynamic Dag with Conf Variables in Airflow

I am trying to make a dag which takes in a conf parameter which basically parses the file name to use downstream in the dag. So something like parsing something.txt into something to deal with file name transformations I have in the pipeline.

I am getting airflow.exceptions.AirflowException: Task is missing the start_date parameter so I am wondering if I am on the right track and if anyone has any suggestions as I cannot hard code these variables via Variable. My planned way of executing this dag will be to for loop it in bash while passing in the filenames to a conf parameter.

from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime
import os

# Basic arguments to pass to Airflow
args = {
    'owner': 'airflow',
    'start_date': datetime.now(),
}

# Create the head dag
dag = DAG(
    dag_id='test_multiparameters', 
    default_args=args,
    schedule_interval=None)

input_dir = '/input/dchen71/dog/'

templated_command1 = """
touch /input/dchen71/dog/{{ dag_run.conf['file_name'] }} 
"""

# Touch data file 
touchy = BashOperator(
    task_id='create_data', 
    bash_command=templated_command1, 
    dag=dag)

def decat(**kwargs):
    # Generator function for dynamic dags
    base_name = os.path.splitext(context['dag_run'].conf['file_name'])[0]

    cat_template = """
    cat {input_dir}/{base_name}.txt{input_dir}/{base_name}.txt> {input_dir}/meow.txt
    """.format(input_dir = input_dir, base_name = base_name)
    return BashOperator(
        task_id = "create_data",
        bash_command = cat_template,
        dag = dag
        )

dacat = PythonOperator(
    task_id="dacat",
    python_callable = decat,
    provides_context = True
    )


touchy >> dacat

Upvotes: 0

Views: 2191

Answers (1)

SergiyKolesnikov
SergiyKolesnikov

Reputation: 7815

You forgot to set dag=dag in PythonOperator. That is why you get the exception.

I am not sure what you mean by "for loop it in bash", but to dynamically generate DAGs based on some parameters, I would do the for-loop directly in the Python script. You may want to read this for more info.

Note, it is not recommended to set 'start_date': datetime.now()[1].

Upvotes: 2

Related Questions