user5491439
user5491439

Reputation:

Reading a yaml configuration file and creating a DAG generator in Airflow 2.0

I am new to Airflow 2.0 and really struggling to find a way to save my DAG remotely instead of submitting it in the scheduler automatically. I have a config file which loads settings for my Spark job.

I am trying to write a utility python file which reads the configuration file, parses it and creates a DAG file. I have done it using Astronomer's create_dag example, but it submits the DAG directly and there's no way for me to see the generated DAG code except for the UI.

  1. How can I achieve saving a DAG file instead and submitting later?
  2. Also, is it possible for my utility to have some sort of templating that would include the operators and params which I would need to create and save the DAG file remotely so that I can submit it later? (without this I created a sample dag with hardcoded values, but instead I want a utility that would do this for me and save it remotely)
  3. Any examples?

Upvotes: 0

Views: 9765

Answers (1)

Alan Ma
Alan Ma

Reputation: 591

I am assuming you are referring to this guide on Dynamically Generating DAGs in Airflow.

  1. One way you can "saving a DAG file" instead of having Airflow dynamically create the DAG is to generate the file beforehand. For example, you can add a step in your CI/CD pipelines to run a script that generates your python file and then push that to the scheduler.

  2. This process can be describes as preparing and rendering a template.

    You can use Jinja to accomplish this.

    Fun fact, Airflow also uses Jinja to build its webpages as well as allowing the user to leverage jinja templating to render files and parameters!

  3. The following example should get you started.

generate_file.py

from jinja2 import Environment, FileSystemLoader
import os

file_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(file_dir))

template = env.get_template('dag.template')

# I don't know what the configuration format but as long as you can convert to a dictionary, it can work.
values = {}

filename = os.path.join(file_dir, 'dag.py')
with open(filename, 'w') as fh:
 fh.write(template.render(
     dag_id="my_dag",
     num_task=2,
     **values
 ))

dag.template

from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

dag = DAG(
    dag_id='{{ dag_id }}',
    schedule_interval='@once',
    start_date=datetime(2020, 1, 1)
)

with dag:
    dummy = DummyOperator(
        task_id='test',
        dag=dag
    )
{% for n in range(num_task) %}
    op_{{ loop.index }} = PythonOperator(
        task_id='python_op_{{ loop.index }}',
        dag=dag
    )
{% endfor %}

    op_1 >> op2 >> op3

The resulting file dag.py will look like this.

from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

dag = DAG(
    dag_id='my_dag',
    schedule_interval='@once',
    start_date=datetime(2020, 1, 1)
)

with dag:
    dummy = DummyOperator(
        task_id='test',
        dag=dag
    )

    op_1 = PythonOperator(
        task_id='python_op_1',
        dag=dag
    )

    op_2 = PythonOperator(
        task_id='python_op_2',
        dag=dag
    )
    
    op_1 >> op2 >> op3

Upvotes: 2

Related Questions