Reputation: 1148
I am new to airflow, I feel like I may be missing some convention or concept.
Context: I have files being periodically dropped into an S3 bucket. My pipeline will need to grab new files and process them.
Basically: How do I avoid re-processing?
It is not unlikely that some part of the pipeline will change in the future and I will want to re-process files. But on a day-to-day basis I don't want to re-process files. Additionally there will likely be other pipelines in the future which would need to start from the beginning and process all the files for a different output.
I have plenty of scrappy ways of preserving state (a local json file, or checking the existence of output files) - but I'm wondering if there's a convention in airflow. What makes the most sense to me at the moment is to re-use the postgres that exists for airflow (maybe bad form?), add another DB and start creating tables in there where I list input files if they have been processed for workflow X, workflow Y, etc.
How would you do this?
Upvotes: 5
Views: 1881
Reputation: 9507
Here is how I have solved a similar problem with a 4 task DAG
.
Write a custom S3Sensor that extends BaseSensorOperator
.
This sensor uses the boto3 library, watches a specific folder in the bucket.
If any files are put into this bucket, it posts all the file paths to Xcom.
This Sensor is the first operator in the dag.
The next operator in the dag is a python operator that reads the list from the previous tasks Xcom. It moves all the files to another folder in the same bucket, again listing the new paths to Xcom.
The next operator processes each of these files.
The next operator triggers this same dag again (so we start back at the custom s3 file sensor because this dag retriggers itself).
The dag needs to not have any schedule_interval
, and needs to be triggered once manually. It will then watch the bucket until forever, or until something breaks.
Upvotes: 3