pinas
pinas

Reputation: 3074

Run airflow DAG for each file

So I have this quite nice DAG in airflow which basically runs several analysis steps (implemented as airflow plugins) on binary files. A DAG is triggert by an ftp sensor which just checks if there is a new file on the ftp server and then starts the whole workflow.

So currently the workflow is like this: DAG is triggert as defined -> sensor waits for new file on ftp -> analysis steps are executed -> end of workflow.

What I'd like to have is something like this: DAG is triggerts -> sensor waits for new file on ftp -> for every file on the ftp the analysis steps are executed individully -> each workflow ends individually.

How do I get the analysis workflow to be executed for each file on the ftp server and if there is no file on the server, just one sensor should wait for a new file? I don't want to e.g., start a DAG every second or so because then I have many sensors just waiting for a new file.

Upvotes: 4

Views: 1380

Answers (1)

Ryan Yuan
Ryan Yuan

Reputation: 2566

Use 2 DAGs to separate the sensing step from analysis steps.

DAG 1:

sensor waits for new file on ftp -> once new file lands, use TriggerDagRunOperator to trigger DAG 1 itself -> use TriggerDagRunOperator to trigger DAG 2

DAG 2:

do the analysis steps for the file

Upvotes: 3

Related Questions