Reputation: 888
In a real system, some sensor data will be dumped into specific directory as csv files. Then some data pipeline will populate these data to some database. Another pipeline will send these data to predict service.
I only have training and validation csv files as of now. I'm planning to simulate the flow to send data to predict service following way:
DAG1 - Every 2 min, select some files randomly from a specific path and update the timestamp of those files. Later, I may chose to add a random delay after the start node.
DAG2 - FileSensor pokes every 3 min. If it finds subset of files with modified timestamp, it should pass those to subsequent stages to eventually run the predict service.
It looks to me if I use FileSensor as-is, I can't achieve it. I'll have to derive from FileSensor class (say, MyDirSensor), check the timestamp of all the files - select the ones which are modified after the last successful poke and pass those forward.
Is my understanding correct? If yes, for last successful poke timestamp, can I store in some variable of MyDirSensor? Can I push/pull this data to/from xcom? What will be task-id in that case? Also, how to pass these list of files to the next task?
Is there any better approach (like different Sensor etc.) for this simulation? I'm running the whole flow on standalone machine currently. My airflow version is 1.10.15.
Upvotes: 0
Views: 1449
Reputation: 20077
I am not sure if current Airflow approach is best for this use case actually. In the current incarnation Airflow is really all about working on "data intervals" - so basically each "dag run" is connected to some "data interval" and it should be processing data for that data interval. Classic Batch processing.
If I understand your case is more like a streaming (not entirely) but close. You get some (subset of) data which arrived since the last time and you process that data. This is not what (again current) version of Airflow - not even 2.1 is supposed to handle - because there is a complex manipulation of "state" which is not "data interval" related (and Airflow currently excels in the "data interval" case).
You can indeed do some custom operators to handle that. I think there is no ready-to-reuse pattern in Airflow for what you want to achieve, but Airflow is flexible enough that if you write your own operators you can certainly work around it and implement what you want. And writing operators in Airflow is super easy - it's a simple Python class with "execute" which can reuse existing Hooks to reach out to external services/storages and use XCom for communication between tasks. It's surprisingly easy to add a new operator doing even complex logic (and again - reusing hooks to make it easier to communicate with external services). For that, I think it's still worth to use Airflow for what you want to do.
How I would approach it - rather than modifying the timestamps of the files, I'd create other files - markers - with the same names, different extensions and base my logic of processing on that (this way you can use the external storage as the "state"). I think there will be no ready "operators" or "sensors" to help with it, but again - writing custom one is easy and should work.
However soon (several months) in Airflow 2.2 (and even more in 2.3) we are going to have some changes (mainly around flexible scheduling and decoupling dag runs from data intervals and finally to allow dynamic DAG with flaxible structure that can change per-run) that will provide some nice way of handling cases similar to yours.
Stay tuned - and for now rely on your own logic, but look out for simplifying that in the future when Airflow will be better suited for your case.
And in the meantime - do upgrade to Airflow 2. It's well worth it and Airflow 1.10 reached end of life in June, so the sooner you do, the better - as there will not be any more fixes to Airflow 1.10 (even critical security fixes)
Upvotes: 1