Nandha
Nandha

Reputation: 882

How to create dags inside another dag apache airflow

I am trying to have a master dag which will create further dags based on my need. I have the following python file inside the dags_folder in airflow.cfg. This code creates the master dag in database. This master dag should read a text file and should create dags for each line in the text file. But the dags created inside the master dag are not added to the database. What is the correct way to create it?

Version details:

Python version: 3.7

Apache-airflow version: 1.10.8

import datetime as dt

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

root_dir = "/home/user/TestSpace/airflow_check/res"

print("\n\n ===> \n Dag generator")

default_args = {
    'owner': 'airflow',
    'start_date': dt.datetime(2020, 3, 22, 00, 00, 00),
    'concurrency': 1,
    'retries': 0
}


def greet(_name):
    message = "Greetings {} at UTC: {} Local: {}\n".format(_name, dt.datetime.utcnow(), dt.datetime.now())
    f = open("{}/greetings.txt".format(root_dir), "a+")
    print("\n\n =====> {}\n\n".format(message))
    f.write(message)
    f.close()


def create_dag(dag_name):
    with DAG(dag_name, default_args=default_args,
             schedule_interval='*/2 * * * *',
             catchup=False
             ) as i_dag:
        i_opr_greet = PythonOperator(task_id='greet', python_callable=greet,
                                     op_args=["{}_{}".format("greet", dag_name)])
        i_echo_op = BashOperator(task_id='echo', bash_command='echo `date`')

        i_opr_greet >> i_echo_op
    return i_dag


def create_all_dags():
    all_lines = []
    f = open("{}/../dag_names.txt".format(root_dir), "r")
    for x in f:
        all_lines.append(str(x))
    f.close()

    for line in all_lines:
        print("Dag creation for {}".format(line))
        globals()[line] = create_dag(line)


with DAG('master_dag', default_args=default_args,
         schedule_interval='*/1 * * * *',
         catchup=False
         ) as dag:
    echo_op = BashOperator(task_id='echo', bash_command='echo `date`')
    create_op = PythonOperator(task_id='create_dag', python_callable=create_all_dags)
    echo_op >> create_op

Upvotes: 2

Views: 3304

Answers (2)

kaxil
kaxil

Reputation: 18824

You have 2 options:

  1. Use SubDagOperator: Example DAG. Use it if your Schedule Interval can be the same.
  2. Write a Python DAG File: From you master DAG, create Python files in your AIRFLOW_HOME containing DAGs. You can use Jinja2 templating engine for this.

Upvotes: 4

Related Questions