Reputation: 882
I am trying to have a master dag which will create further dags based on my need. I have the following python file inside the dags_folder in airflow.cfg. This code creates the master dag in database. This master dag should read a text file and should create dags for each line in the text file. But the dags created inside the master dag are not added to the database. What is the correct way to create it?
Version details:
Python version: 3.7
Apache-airflow version: 1.10.8
import datetime as dt
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
root_dir = "/home/user/TestSpace/airflow_check/res"
print("\n\n ===> \n Dag generator")
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2020, 3, 22, 00, 00, 00),
'concurrency': 1,
'retries': 0
}
def greet(_name):
message = "Greetings {} at UTC: {} Local: {}\n".format(_name, dt.datetime.utcnow(), dt.datetime.now())
f = open("{}/greetings.txt".format(root_dir), "a+")
print("\n\n =====> {}\n\n".format(message))
f.write(message)
f.close()
def create_dag(dag_name):
with DAG(dag_name, default_args=default_args,
schedule_interval='*/2 * * * *',
catchup=False
) as i_dag:
i_opr_greet = PythonOperator(task_id='greet', python_callable=greet,
op_args=["{}_{}".format("greet", dag_name)])
i_echo_op = BashOperator(task_id='echo', bash_command='echo `date`')
i_opr_greet >> i_echo_op
return i_dag
def create_all_dags():
all_lines = []
f = open("{}/../dag_names.txt".format(root_dir), "r")
for x in f:
all_lines.append(str(x))
f.close()
for line in all_lines:
print("Dag creation for {}".format(line))
globals()[line] = create_dag(line)
with DAG('master_dag', default_args=default_args,
schedule_interval='*/1 * * * *',
catchup=False
) as dag:
echo_op = BashOperator(task_id='echo', bash_command='echo `date`')
create_op = PythonOperator(task_id='create_dag', python_callable=create_all_dags)
echo_op >> create_op
Upvotes: 2
Views: 3304
Reputation: 18824
You have 2 options:
Upvotes: 4
Reputation: 1174
Have a look at the TriggerDagRunOperator: https://airflow.apache.org/docs/stable/_api/airflow/operators/dagrun_operator/index.html
Example usage:
https://github.com/apache/airflow/blob/master/airflow/example_dags/example_trigger_controller_dag.py
Upvotes: 1