Reputation: 170
I am trying to make a dag which takes in a conf parameter which basically parses the file name to use downstream in the dag. So something like parsing something.txt
into something
to deal with file name transformations I have in the pipeline.
I am getting airflow.exceptions.AirflowException: Task is missing the start_date parameter
so I am wondering if I am on the right track and if anyone has any suggestions as I cannot hard code these variables via Variable. My planned way of executing this dag will be to for loop it in bash while passing in the filenames to a conf parameter.
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime
import os
# Basic arguments to pass to Airflow
args = {
'owner': 'airflow',
'start_date': datetime.now(),
}
# Create the head dag
dag = DAG(
dag_id='test_multiparameters',
default_args=args,
schedule_interval=None)
input_dir = '/input/dchen71/dog/'
templated_command1 = """
touch /input/dchen71/dog/{{ dag_run.conf['file_name'] }}
"""
# Touch data file
touchy = BashOperator(
task_id='create_data',
bash_command=templated_command1,
dag=dag)
def decat(**kwargs):
# Generator function for dynamic dags
base_name = os.path.splitext(context['dag_run'].conf['file_name'])[0]
cat_template = """
cat {input_dir}/{base_name}.txt{input_dir}/{base_name}.txt> {input_dir}/meow.txt
""".format(input_dir = input_dir, base_name = base_name)
return BashOperator(
task_id = "create_data",
bash_command = cat_template,
dag = dag
)
dacat = PythonOperator(
task_id="dacat",
python_callable = decat,
provides_context = True
)
touchy >> dacat
Upvotes: 0
Views: 2191
Reputation: 7815
You forgot to set dag=dag
in PythonOperator
. That is why you get the exception.
I am not sure what you mean by "for loop it in bash", but to dynamically generate DAGs based on some parameters, I would do the for-loop directly in the Python script. You may want to read this for more info.
Note, it is not recommended to set 'start_date': datetime.now()
[1].
Upvotes: 2