Reputation:
I am new to Airflow 2.0 and really struggling to find a way to save my DAG remotely instead of submitting it in the scheduler automatically. I have a config file which loads settings for my Spark job.
I am trying to write a utility python file which reads the configuration file, parses it and creates a DAG file. I have done it using Astronomer's create_dag
example, but it submits the DAG directly and there's no way for me to see the generated DAG code except for the UI.
Upvotes: 0
Views: 9765
Reputation: 591
I am assuming you are referring to this guide on Dynamically Generating DAGs in Airflow.
One way you can "saving a DAG file" instead of having Airflow dynamically create the DAG is to generate the file beforehand. For example, you can add a step in your CI/CD pipelines to run a script that generates your python file and then push that to the scheduler.
This process can be describes as preparing and rendering a template.
You can use Jinja to accomplish this.
Fun fact, Airflow also uses Jinja to build its webpages as well as allowing the user to leverage jinja templating to render files and parameters!
The following example should get you started.
generate_file.py
from jinja2 import Environment, FileSystemLoader
import os
file_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(file_dir))
template = env.get_template('dag.template')
# I don't know what the configuration format but as long as you can convert to a dictionary, it can work.
values = {}
filename = os.path.join(file_dir, 'dag.py')
with open(filename, 'w') as fh:
fh.write(template.render(
dag_id="my_dag",
num_task=2,
**values
))
dag.template
from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='{{ dag_id }}',
schedule_interval='@once',
start_date=datetime(2020, 1, 1)
)
with dag:
dummy = DummyOperator(
task_id='test',
dag=dag
)
{% for n in range(num_task) %}
op_{{ loop.index }} = PythonOperator(
task_id='python_op_{{ loop.index }}',
dag=dag
)
{% endfor %}
op_1 >> op2 >> op3
The resulting file dag.py will look like this.
from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='my_dag',
schedule_interval='@once',
start_date=datetime(2020, 1, 1)
)
with dag:
dummy = DummyOperator(
task_id='test',
dag=dag
)
op_1 = PythonOperator(
task_id='python_op_1',
dag=dag
)
op_2 = PythonOperator(
task_id='python_op_2',
dag=dag
)
op_1 >> op2 >> op3
Upvotes: 2