Reputation: 870
Is it possible to schedule a DAG run when message arrives at the SQS queue?? I also need the dag to process the message in the queue. From what I know this could be done by using the SQSSensor
but I couldn't find any example and I am confused on how to move forward.
Upvotes: 0
Views: 2053
Reputation: 3054
Airflow runs DAGs on a fixed interval, while you're now looking to trigger DAGs per event. You'll have to do this outside of Airflow, e.g. using a Lambda trigger listening on the queue, which triggers an Airflow DAG via the REST API.
The SQSSensor in Airflow won't allow for event-by-event processing because it simply polls the queue after a DAG run starts (checking for new messages, pushing them to an XCom with key "messages", and deleting the messages if found). So if your DAG run is scheduled to once a day, an SQSSensor would only start polling for new messages once a day.
I can't find an SQSOperator in Airflow for reading SQS messages, so to create an event-triggered SQS + Airflow workflow, my best guess is to set up a Lambda for triggering Airflow DAGs via the REST API, and the DAG itself will start with an SQSSensor which reads all messages on the queue, and other tasks after that read and process the values from the XCom created by the SQSSensor task. The schedule_interval of the DAG can be set to None
since it will be triggered via the REST API.
Upvotes: 3