Reputation: 513
I know how to run DAG's tasks dynamically using Variable and it works pretty well until you trigger multiple runs for the same DAG.
i.e., Once a new directory with files are created under data/to/load/ dir at some point, I have a script written somewhere which will trigger airflow variables -set dir data/to/load/$newDir
followed by airflow trigger_dag dyn_test
. Now let's say directory "a" and "b" is created(at similar time) under data/to/load/ which will make airflow variable + airflow trigger_dag
calls twice with two different inputs on variable set call(one suffixed with 'a' and the other with 'b' ofcourse). And i see two Jobs running for the DAG in airflow GUI but the problem is that they are both considering same directory value, a or b. This could definitely mean that it takes the final 'airflow variable set' call. How do i resolve it?. What is the way to trigger multiple runs each taking different values(in dir variable) to loop dynamically. My Dag looks something like this:
# Using Airflow Variables
from airflow.models import Variable
dir = Variable.get("dir")
args = {
'owner': 'airflow',
'start_date': datetime(2004, 11, 12),
}
dag = DAG(
dag_id='dyn_test',
default_args=args,
schedule_interval='@once'
)
filesInDir = next(os.walk(dir))[2]
for file in filesInDir:
task1 = # change 'file' structure
task2 = # store changed 'file'
task1 >> task2
Upvotes: 1
Views: 2623
Reputation: 38992
The scenario described in your question is one where a first-in first-out queue is fitting that is assuming you want to keep the current way of explicitly setting the directory to be processed as a separate sequence.
That said, Airflow CLI trigger_dags
command allows to pass the --conf
flag for setting the configuration dictionary passed in the DagRun
and I'll go that way as you have described that where the variable is set, there is the dag triggered.
http://airflow.apache.org/cli.html#trigger_dag
Here is how that will possibly look in code.
airflow trigger_dag dyn_test --conf '{"me_seeks.dir": "data/to/load/$newDir"}'
You will set the provide_context
kwargs
in what airflow operator you use for the tasks.
The instance of the DagRun can retrieved in the context and the dir
value set in the configuration retrieved
Say that you defined your tasks with Airflow PythonOperator
; then your code to retrieved dir
in the python_callable
will look similar to this:
def me_seeks(dag_run=None):
dir = dag_run.conf['me_seeks.dir']
Upvotes: 1