Reputation: 1244
I came accross this article using the watchdog API and it seemed like exactly what I need: https://medium.com/@phanikumaryadavilli/hacking-apache-airflow-to-trigger-dags-based-on-filesystem-events-25f822fd08c3
(code was not written by me)
import os
import time
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from datetime import datetime, timedelta
dag = DAG(dag_id="test_trigger_dag_operator",default_args={"owner":"Airflow", "start_date":datetime(2020,3,9)})
trigger = TriggerDagRunOperator(
task_id="test_trigger_dag_run_operator",
trigger_dag_id="dummy_operator",
conf={"message": "Hello World"},
dag=dag,
)
class Handler(FileSystemEventHandler):
def on_created(self, event):
if event.event_type == 'created':
print("file created")
print('Executing the dag')
trigger
def main():
observer = Observer()
event_handler = FileSystemEventHandler()
observer_path = os.getcwd()
observer.schedule(Handler(), observer_path, recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == '__main__':
main()
Unfortunately using the authors code the only thing the dag does is to unconditionally trigger the target dag and main()
never gets called, i.e. there is also no file-watching.
I made some slight modifications of the code adding the python_callable
attribute in the TriggerDagRunOperator
as well as adding the necessary args to main (context, dag_run_obj
)
trigger = TriggerDagRunOperator(
task_id="test_trigger_dag_run_operator",
trigger_dag_id="dummy_operator",
conf={"message": "Hello World"},
python_callable: main,
dag=dag,
)
and removing the
if __name__ == '__main__':
main()
part.
Now the file-watchdog is working, still the target-dag is triggered once in any case and the scheduler hangs as soon as starting the dag. (Which is kinda what one would expect with while (true)
) How can I use the provided code in a working manner?
Upvotes: 4
Views: 5392
Reputation: 464
Airflow has it's own service named DagBag Filling, that parses your dag and put it in the DagBag, a DagBag is the collection of dags you see both on the UI and the metadata DB.
While doing the DagBag filling on your file (parsing any DAG on it) it actually never ends! You are running that watcher inside this DAG file definition itself.
To avoid such scenario you'll need to implement a Sensor
Sensor - an Operator that waits (polls) for a certain time, file, database row, S3 key, etc…
, that will do exactly what you want but prevent your environment to crash. That essentially will be to reimpelement your main function in the overwritten poke() method on your custom sensor.
You can check any existing sensor in the contrib repo, or write your custom sensor based on your needs.
Also if u want to just trigger a dag from your application you can submit a POST request to that dagid through the REST API
Any of both implementations will fix your issue
Upvotes: 2