Reputation: 3074
So I have this quite nice DAG in airflow which basically runs several analysis steps (implemented as airflow plugins) on binary files. A DAG is triggert by an ftp sensor which just checks if there is a new file on the ftp server and then starts the whole workflow.
So currently the workflow is like this: DAG is triggert as defined -> sensor waits for new file on ftp -> analysis steps are executed -> end of workflow.
What I'd like to have is something like this: DAG is triggerts -> sensor waits for new file on ftp -> for every file on the ftp the analysis steps are executed individully -> each workflow ends individually.
How do I get the analysis workflow to be executed for each file on the ftp server and if there is no file on the server, just one sensor should wait for a new file? I don't want to e.g., start a DAG every second or so because then I have many sensors just waiting for a new file.
Upvotes: 4
Views: 1380
Reputation: 2566
Use 2 DAGs to separate the sensing step from analysis steps.
DAG 1:
sensor waits for new file on ftp -> once new file lands, use TriggerDagRunOperator to trigger DAG 1 itself -> use TriggerDagRunOperator to trigger DAG 2
DAG 2:
do the analysis steps for the file
Upvotes: 3