sayanti bhattacharjee
sayanti bhattacharjee

Reputation: 39

How to display Airflow DAG status in Big Query tables

I want to show the DAG (airflow) final status (success/Failure) to a table in BQ. Like that table can contains: Date-Time,DAG-Name,Status etc columns and it will get populated according to the final status of the DAG.

Please help; how can this be achieved?

Upvotes: 1

Views: 1836

Answers (4)

Ashish Ojha
Ashish Ojha

Reputation: 1

I have used a log router to send airflow worker logs to bigquery tables and then used it create a view to extract dag name, scheduled date, tasks, task start time, end time, duration, dag duration and dag status.

Here is the approach:

  1. Log router to load worker logs to table using filter (resource.type="cloud_composer_environment" log_name="projects/{project_id}/logs/airflow-worker")

  2. Used the airflow-worker table to create a view (modify according to the requirements)

    WITH TaskInfo AS ( SELECT DISTINCT B.DAG_Name, B.Task_Name, B.Dag_scheduled_date_run, B.Dag_scheduled_start_time AS Dag_scheduled_start_time, B.Running_Task_start_time, CAST(C.Task_start_time AS TIMESTAMP) AS C_Task_start_time, CAST(C.Task_end_time AS TIMESTAMP) AS C_Task_end_time, C.Task_run_status FROM ( SELECT DISTINCT labels.workflow AS DAG_Name, labels.task_id AS Task_Name, timestamp as Running_task_start_time, PARSE_TIMESTAMP('%Y-%m-%dT%H:%M:%S',SUBSTR(labels.execution_date, 1, 19)) AS Dag_scheduled_start_time, FORMAT_DATE('%Y%m%d', DATE(labels.execution_date)) AS Dag_scheduled_date_run, textpayload -- Include textpayload for extraction FROM {project_id}.DAG_Details.airflow_worker WHERE CONTAINS_SUBSTR(textPayload, "Running <TaskInstance") ) B LEFT OUTER JOIN ( SELECT DISTINCT REGEXP_EXTRACT(textPayload, r'dag_id=([^,]+)') AS DAG_Name, REGEXP_EXTRACT(textPayload, r'task_id=([^\s,]+)') AS Task_Name, REGEXP_EXTRACT(textPayload, r"execution_date=(\w+)") AS Task_scheduled_date, FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%S', PARSE_TIMESTAMP('%Y%m%dT%H%M%S', (REGEXP_EXTRACT(textPayload, r"start_date=(\w+)")))) AS Task_start_time, FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%S', PARSE_TIMESTAMP('%Y%m%dT%H%M%S', (REGEXP_EXTRACT(textPayload, r"end_date=(\w+)")))) AS Task_end_time, REGEXP_EXTRACT(textpayload,'(?:Marking\stask\sas\s)(.+?)\.') task_run_status FROM {project_id}.DAG_Details.airflow_worker WHERE CONTAINS_SUBSTR(textPayload, "Marking task as") ) C ON B.DAG_Name = C.DAG_Name AND B.Task_Name = C.Task_Name AND B.Dag_scheduled_start_time = PARSE_TIMESTAMP('%Y%m%dT%H%M%S', C.task_scheduled_date) ) , RankedTaskDurations AS ( SELECT Dag_scheduled_date_run, Dag_scheduled_start_time, DAG_Name, Task_Name, MIN(C_Task_start_time) AS Min_Start_Time, MAX(C_Task_end_time) AS Max_End_Time, TIMESTAMP_DIFF(MAX(C_Task_end_time), MIN(C_Task_start_time), SECOND) AS Task_Duration_in_Second, CASE WHEN MAX(C_Task_end_time) IS NULL THEN MAX(Running_Task_start_time) ELSE MAX(C_Task_start_time) END AS Task_start_time, MAX(C_Task_end_time) AS Task_end_time, CASE WHEN MAX(C_Task_end_time) IS NOT NULL THEN Task_run_status ELSE 'Running' END AS Task_Status, ROW_NUMBER() OVER (PARTITION BY Dag_scheduled_date_run, Dag_scheduled_start_time, DAG_Name, task_name, task_run_status ORDER BY MAX(C_Task_start_time) DESC) AS RunRank FROM TaskInfo GROUP BY Dag_scheduled_date_run, Dag_scheduled_start_time, DAG_Name, Task_name, task_run_status) , LatestTaskDurations AS ( SELECT Dag_scheduled_date_run, Dag_scheduled_start_time, DAG_Name, Task_Name, Task_Duration_in_Second, Task_start_time, Task_end_time, Task_Status FROM RankedTaskDurations WHERE RunRank = 1 ) , DAGDurations AS ( SELECT Dag_scheduled_date_run, Dag_scheduled_start_time, DAG_Name, MAX(C_Task_end_time) AS Max_End_Time, MIN(C_Task_start_time) AS Min_Start_Time, TIMESTAMP_DIFF(MAX(C_Task_end_time), MIN(C_Task_start_time), SECOND) AS DAG_Total_Time, FROM TaskInfo GROUP BY Dag_scheduled_date_run,Dag_scheduled_start_time,DAG_Name ), SummarizedResult as ( SELECT T.Dag_scheduled_date_run, T.Dag_scheduled_start_time, T.DAG_Name, T.Task_Name, T.Task_start_time, T.Task_end_time, T.Task_Duration_in_Second, D.DAG_Total_Time, CONCAT( FLOOR(T.Task_Duration_in_second / 3600), 'h ', FLOOR(MOD(T.Task_Duration_in_second, 3600) / 60), 'm ', MOD(T.Task_Duration_in_second, 60), 's' ) AS Task_Duration_Formatted, CONCAT( FLOOR(D.DAG_Total_Time / 3600), 'h ', FLOOR(MOD(D.DAG_Total_Time, 3600) / 60), 'm ', MOD(D.DAG_Total_Time, 60), 's' ) AS DAG_Total_Time_Formatted, T.Task_Status, CASE WHEN MAX(CASE WHEN T.Task_Status = 'FAILED' THEN 1 ELSE 0 END) OVER (PARTITION BY T.Dag_scheduled_start_time, T.DAG_Name) = 1 THEN 'FAILED' WHEN MAX(CASE WHEN T.Task_Status = 'Running' THEN 1 ELSE 0 END) OVER (PARTITION BY T.Dag_scheduled_start_time, T.DAG_Name) = 1 THEN 'Running' ELSE 'SUCCESS' END AS DAG_Status FROM LatestTaskDurations T JOIN DAGDurations D ON T.Dag_scheduled_date_run = D.Dag_scheduled_date_run
    AND T.Dag_scheduled_start_time = D.Dag_scheduled_start_time AND T.DAG_Name = D.DAG_Name ORDER BY T.Dag_scheduled_date_run DESC, t.Dag_scheduled_start_time, T.DAG_Name, T.task_name ) select A.Dag_scheduled_date_run, A.Dag_scheduled_start_time, A.DAG_Name, A.Task_Name, A.Task_start_time, A.Task_end_time, A.Task_Duration_in_Second, A.Task_Duration_Formatted, A.Task_Status, A.DAG_Total_Time, A.DAG_Total_Time_Formatted, A.DAG_Status from SummarizedResult A order by A.Dag_scheduled_date_run DESC, A.Dag_scheduled_start_time, A.DAG_Name, A.task_name;

Let me know if this helps you and feel free to add any feedback or suggestions.

Upvotes: 0

Betjens
Betjens

Reputation: 1401

To complement the answer of user Bas Harenslak. There are these options also that you can explore:

  • You can make use of TriggerDagRunOperator. By using it you can have one dag (a recap-dag) which will be referenced by your DAGs to populate the record into your destination dataset.
trigger_recap_dag = TriggerDagRunOperator(
      task_id="trigger_recap_dag",
      trigger_dag_id="recap-dag",
      wait_for_completion=False,
      allowed_states=['success']
      conf='{"Time": datetime.now() ,"DAG": "recap-dag","Status":"success"}'
  )

ingestion >> transformation >> save >> send_notification >> trigger_recap_dag 
  • If you see fit, This recap-dag can also be independent and only run every hour/day/week of your election and checks your DAGs status.
with DAG(
  'recap-dag',
  schedule_interval='@daily',
  start_date=datetime(2021, 1, 1),
  catchup=False, 
) as dag:

  ...
  # Airflow >= 2.0.0
  # Inside a python Operator
  def GetRunningDagsInfo():
     dag_runs = DagRun.find(
       dag_id=your_dag_id,
       execution_start_date=your_start_date
       execution_end_date=your_end_date
     )
    
  ...
  • You can make use of prior options and come with a solution like this:

After you dag (or dags) complete, it will fire the trigger dag. this recap-dag will saves your dag records into a custom table or file and then your independent DAG runs and retrieves the datasets that have been created so far and push the data into your BigQuery Table.

  • Another option is by looking into your Airflow Database to retrieve running information. Know as Data Profiling. It has been deprecated in latest versions due to security concerns.

Upvotes: 0

Daniel Zagales
Daniel Zagales

Reputation: 3034

If you need the data in real-time, I would go with somethign along the lines of the approach @Bas has suggested, maybe with firestore or Cloud SQL. However note his comments on the inserts per day if you go with BigQuery.

If you can wait on the results on a daily basis you can do a log sink to BigQuery as described here: https://cloud.google.com/bigquery/docs/reference/auditlogs#stackdriver_logging_exports

In the filter criteria you can either bring in all of the Airflow logs or just the ones from the worker/scheduler.

Ex criteria:

resource.type="cloud_composer_environment"
logName="projects/{YOUR-PROJECT}/logs/airflow-worker"

In the log textPayload you will see something like:

Marking task as SUCCESS. dag_id=thing, task_id=stuff, executiondate=20220307T111111, start_date=20220307T114858, end_date=20220307T114859

You can then parse for what you need in BigQuery

Upvotes: 0

Bas Harenslak
Bas Harenslak

Reputation: 3084

There's no native out-of-the-box method to achieve this in Airflow. However, you could implement a function yourself which writes data to BigQuery and run it via a DAG's on_success_callback and on_failure_callback methods.

Note: BigQuery is not a transactional database and has limits on the number of inserts per day. For a large number of DAG runs, you might want to think of writing results in batches to BigQuery.

Upvotes: 1

Related Questions