Chris
Chris

Reputation: 1244

Trigger dag via file watcher in airflow

I came accross this article using the watchdog API and it seemed like exactly what I need: https://medium.com/@phanikumaryadavilli/hacking-apache-airflow-to-trigger-dags-based-on-filesystem-events-25f822fd08c3

(code was not written by me)

import os
import time
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from datetime import datetime, timedelta


dag = DAG(dag_id="test_trigger_dag_operator",default_args={"owner":"Airflow", "start_date":datetime(2020,3,9)})

trigger = TriggerDagRunOperator(
    task_id="test_trigger_dag_run_operator",
    trigger_dag_id="dummy_operator",
    conf={"message": "Hello World"},
    dag=dag,
    )

class Handler(FileSystemEventHandler):
    def on_created(self, event):
        if event.event_type == 'created':
            print("file created")
            print('Executing the dag')
            trigger

def main():
    observer = Observer()
    event_handler = FileSystemEventHandler()
    observer_path = os.getcwd()
    observer.schedule(Handler(), observer_path, recursive=False)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    
    observer.join()

if __name__ == '__main__':
    main()

Unfortunately using the authors code the only thing the dag does is to unconditionally trigger the target dag and main() never gets called, i.e. there is also no file-watching.

I made some slight modifications of the code adding the python_callable attribute in the TriggerDagRunOperator as well as adding the necessary args to main (context, dag_run_obj)

trigger = TriggerDagRunOperator(
    task_id="test_trigger_dag_run_operator",
    trigger_dag_id="dummy_operator",
    conf={"message": "Hello World"},
    python_callable: main,
    dag=dag,
    )

and removing the

if __name__ == '__main__':
    main()

part.

Now the file-watchdog is working, still the target-dag is triggered once in any case and the scheduler hangs as soon as starting the dag. (Which is kinda what one would expect with while (true)) How can I use the provided code in a working manner?

Upvotes: 4

Views: 5392

Answers (1)

Alejandro Kaspar
Alejandro Kaspar

Reputation: 464

Airflow has it's own service named DagBag Filling, that parses your dag and put it in the DagBag, a DagBag is the collection of dags you see both on the UI and the metadata DB.

While doing the DagBag filling on your file (parsing any DAG on it) it actually never ends! You are running that watcher inside this DAG file definition itself.

To avoid such scenario you'll need to implement a Sensor

Sensor - an Operator that waits (polls) for a certain time, file, database row, S3 key, etc…

, that will do exactly what you want but prevent your environment to crash. That essentially will be to reimpelement your main function in the overwritten poke() method on your custom sensor.

You can check any existing sensor in the contrib repo, or write your custom sensor based on your needs.

Also if u want to just trigger a dag from your application you can submit a POST request to that dagid through the REST API

Any of both implementations will fix your issue

Upvotes: 2

Related Questions