Reputation: 1
I am creating the dag dynamically using the metadata that is stored in table.
I am reading the table and creating a dictionary that looks like below.
This is sample data and The table has other columns as well.
metadata = [
{'source_system': 'src1', 'table_name': 'table1', 'dag_group_number': 1, 'job_schedule_cron': '0 5 * * *'},
{'source_system': 'src1', 'table_name': 'table1', 'dag_group_number': 1, 'job_schedule_cron': '0 5 * * *'},
{'source_system': 'src1', 'table_name': 'table1', 'dag_group_number': 2, 'job_schedule_cron': '0 5 * * *'},
{'source_system': 'src2', 'table_name': 'table2', 'dag_group_number': 1, 'job_schedule_cron': '0 6 * * *'},
{'source_system': 'src2', 'table_name': 'table2', 'dag_group_number': 2, 'job_schedule_cron': '0 6 * * *'},
{'source_system': 'src3', 'table_name': 'table3', 'dag_group_number': 2, 'job_schedule_cron': '0 10 * * *'}
]
Afer generating the dictionary, I am iterating over it and generating DAGs in UI using the globals() concept.
here is sample code from Astronomer. Link
from pendulum import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
def create_dag(dag_id, schedule, dag_number, default_args):
def hello_world_py():
print("Hello World")
print("This is DAG: {}".format(str(dag_number)))
generated_dag = DAG(dag_id, schedule=schedule, default_args=default_args)
with generated_dag:
PythonOperator(task_id="hello_world", python_callable=hello_world_py)
return generated_dag
# build a dag for each number in range(1, 4)
for n in range(1, 4):
dag_id = "loop_hello_world_{}".format(str(n))
default_args = {"owner": "airflow", "start_date": datetime(2023, 7, 1)}
schedule = "@daily"
dag_number = n
globals()[dag_id] = create_dag(dag_id, schedule, dag_number, default_args)
My question is: How do I set the schedule dynamically? I have the schedule defined in the metadata table for each source and the same I want to use in DAG as well.
if I use
dag = DAG(
dag_id='notice_slack',
default_args=args,
schedule_interval=metadata[source_system][job_schedule_cron],
dagrun_timeout=timedelta(minutes=1))
I am getting errors: AirflowTimetableInvalid: Exactly 5,6 or 7 columns has to be specified for iterator expression
and
Airflow Timetable Invalid: ['0 7 * * *'] is not acceptable.
Any suggestions or guidance on how to fix this?
Upvotes: 0
Views: 35
Reputation: 601
There is another way to solve this issue
Step 1:
Create the Airflow Variables for the each DAG; the key should be DAG name and value should be cron schedule for example, key = loop_hello_world_1 , value = 0 5 * * *
The airflow variables should be created while airflow services are coming up during the deployment time.
Step 2:
In side your DAG creation code, get the Airflow Variable using the dag name and use it for schedule.
Similary, you have to create multiple variables in the Airflow Variables section for each DAG ids.
sample code to retrieve the Variable value using Variables.get function. you can also pass the default value to the get function; the default value will be a fallback incase the key is not present in the env.
# build a dag for each number in range(1, 4)
for n in range(1, 4):
dag_id = "loop_hello_world_{}".format(str(n))
# adding logic to fetch the schedule value from Variables
# add default
# values incase the key is not present in the Airflow variables section
schedule_cron = Variable.get('loop_hello_world_{}'.format(str(n), None)
default_args = {"owner": "airflow", "start_date": datetime(2023, 7, 1)}
schedule = "@daily"
dag_number = n
globals()[dag_id] = create_dag(dag_id, schedule_cron, dag_number, default_args)
Upvotes: 0