Reputation: 31
I have one DAG that I pass a variety of configurations, and one of the settings I want to pass is how often it should run.
For example, using the same DAG, I have two different RUNS. RUN A I want to run daily. RUN B I want to run weekly. Both of these use the exact same DAG code but have different configurations passed.
So far as I can see, there is no way to easily pass the schedule within the configuration. The only solution I have is to make multiple DAGs with the exact same code but different schedules, which results in a lot of redundant code duplication.
Is there any better options?
ex: As an example, I have a dag that is a web crawler, and I pass urls for it to crawl. i need to modify the frequency of the crawling for different sets of urls, basically. The urls I am passing can change and there is no way to identify what schedule to use other than the run parameters
Upvotes: 3
Views: 3882
Reputation: 16099
In this case since daily contains weekly it's best to just have a daily run and use branch operator to decide what logic to use based on day of the week.
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.weekday import BranchDayOfWeekOperator
with DAG(
dag_id="my_dag",
start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
catchup=False,
schedule_interval="@daily",
) as dag:
task_a = EmptyOperator(task_id='logic_a') # Replace with your actual operators for 1st configuration/logic
task_b = EmptyOperator(task_id='logic_b') # Replace with your actual operators for 2nd configuration/logic
branch = BranchDayOfWeekOperator(
task_id="make_choice",
follow_task_ids_if_true="logic_a",
follow_task_ids_if_false="logic_b",
week_day="Monday",
)
branch >> [task_a, task_b]
In this example the DAG is running every day. On Monday it will follow task_a
the rest of the week it will follow task_b
.
Upvotes: 2