Reputation: 12361
I want to have a single DAG to download data from an FTP, I don't need all the data in the FTP just certain files. The files get uploaded daily at certain times throughout the day and I want to retrieve these files shortly after they are available on the FTP site.
Ex FTP schedule:
/Data/US/XASE/yyyymmdd.csv #uploaded daily at 9:00 PM UTC
/Data/EU/TRWB/yyyymmdd.csv #uploaded daily at 1:00 PM UTC
...
/Data/EU/XEUR/yyyymmdd.csv #uploaded daily at 11:00 AM UTC
How can I set the scheduler in the dag so that I can copy the data from the FTP site as they are uploaded and not have a separate dag for each upload time?
Upvotes: 0
Views: 2292
Reputation: 2956
I think you have three options for scheduling here.
Option 1
You run exactly at 11AM,1PM,9PM UTC with the following schedule 0 11,13,21 * * *
. Or maybe 5 mins after the full hour to add some buffer (5 11,13,21 * * *
).
Option 2
You run the DAG more regularly and check if the files are available and then download them within the Task. This makes sense if there is a higher chance that the file upload is delayed.
For example */10 10-22 * * *
would run every 10 minutes between 10:00-22:00.
Option 3
You schedule a DAG once per day (@once
) and then work with TimeDeltaSensor. I think this option is least preferable as you have a lot of tasks just "waiting" - which can block the execution of other airflow tasks.
Besides that it also depends heavily how you want to handle the download from the FTP itself.
I guess you could create a task for every file to download per day and put a task based on BranchPythonOperator in front to avoid trying to download the same file multiple times.
Or you put the whole logic into a PythonOperator including a logic that just downloads certain files based on execution_date
.
Upvotes: 1