Reputation: 4245
I have a DAG which runs two tasks: A
and B
.
Instead of specifying the start_date
on DAG level, I have added it as an attribute to the operators (I am using a PythonOperator
in this case) and removed it form the DAG dictionary. Both tasks run daily.
The start_date
for A
is 2013-01-01 and the start_date
for B
is 2015-01-01. My problem is that Airflow runs for 16 days for tasks A (because I guess in my airflow.cfg
I have left the default dag_concurrency = 16
)from 2013-01-01 and after that it stops. The DAGs are in state running
and the tasks for B
are in state with no status
.
Clearly I am doing something wrong and I can simply set the start_date
on DAG level and have B
run from the start_date
of A
, but that's not what i want to do.
Alternatively I can split them in separate DAGs, but again, that's not how I want to monitor them.
Is there a way to have a DAG with multiple tasks each having its own start_date
? If so, how to do this?
UPDATE:
I know that a ShortCircuitOperator
can be added, but this seems to work only for a flow of tasks which are dependent and there is a downstream. In my case A
is independent of B
.
Upvotes: 2
Views: 2038
Reputation: 18824
Use BranchPythonOperator
and check in that task that your execution_date >= '2015-01-01' or not. If true it should execute Task B, if not it should execute a Dummy Task.
However, I would recommend using a Separate DAG.
Documentation on branching: https://airflow.readthedocs.io/en/1.10.2/concepts.html#branching
Upvotes: 1