Reputation: 39
I want to show the DAG (airflow) final status (success/Failure) to a table in BQ. Like that table can contains: Date-Time,DAG-Name,Status etc columns and it will get populated according to the final status of the DAG.
Please help; how can this be achieved?
Upvotes: 1
Views: 1836
Reputation: 1
I have used a log router to send airflow worker logs to bigquery tables and then used it create a view to extract dag name, scheduled date, tasks, task start time, end time, duration, dag duration and dag status.
Here is the approach:
Log router to load worker logs to table using filter (resource.type="cloud_composer_environment" log_name="projects/{project_id}/logs/airflow-worker")
Used the airflow-worker table to create a view (modify according to the requirements)
WITH TaskInfo AS (
SELECT
DISTINCT B.DAG_Name,
B.Task_Name,
B.Dag_scheduled_date_run,
B.Dag_scheduled_start_time AS Dag_scheduled_start_time,
B.Running_Task_start_time,
CAST(C.Task_start_time AS TIMESTAMP) AS C_Task_start_time,
CAST(C.Task_end_time AS TIMESTAMP) AS C_Task_end_time,
C.Task_run_status
FROM (
SELECT
DISTINCT labels.workflow AS DAG_Name,
labels.task_id AS Task_Name,
timestamp as Running_task_start_time,
PARSE_TIMESTAMP('%Y-%m-%dT%H:%M:%S',SUBSTR(labels.execution_date, 1, 19)) AS Dag_scheduled_start_time,
FORMAT_DATE('%Y%m%d', DATE(labels.execution_date)) AS Dag_scheduled_date_run,
textpayload -- Include textpayload for extraction
FROM
{project_id}.DAG_Details.airflow_worker
WHERE
CONTAINS_SUBSTR(textPayload, "Running <TaskInstance")
) B
LEFT OUTER JOIN (
SELECT
DISTINCT REGEXP_EXTRACT(textPayload, r'dag_id=([^,]+)') AS DAG_Name,
REGEXP_EXTRACT(textPayload, r'task_id=([^\s,]+)') AS Task_Name,
REGEXP_EXTRACT(textPayload, r"execution_date=(\w+)") AS Task_scheduled_date,
FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%S', PARSE_TIMESTAMP('%Y%m%dT%H%M%S', (REGEXP_EXTRACT(textPayload, r"start_date=(\w+)")))) AS Task_start_time,
FORMAT_TIMESTAMP('%Y-%m-%dT%H:%M:%S', PARSE_TIMESTAMP('%Y%m%dT%H%M%S', (REGEXP_EXTRACT(textPayload, r"end_date=(\w+)")))) AS Task_end_time,
REGEXP_EXTRACT(textpayload,'(?:Marking\stask\sas\s)(.+?)\.') task_run_status
FROM
{project_id}.DAG_Details.airflow_worker
WHERE
CONTAINS_SUBSTR(textPayload, "Marking task as")
) C
ON
B.DAG_Name = C.DAG_Name
AND B.Task_Name = C.Task_Name
AND B.Dag_scheduled_start_time = PARSE_TIMESTAMP('%Y%m%dT%H%M%S', C.task_scheduled_date)
)
,
RankedTaskDurations AS (
SELECT
Dag_scheduled_date_run,
Dag_scheduled_start_time,
DAG_Name,
Task_Name,
MIN(C_Task_start_time) AS Min_Start_Time,
MAX(C_Task_end_time) AS Max_End_Time,
TIMESTAMP_DIFF(MAX(C_Task_end_time), MIN(C_Task_start_time), SECOND) AS Task_Duration_in_Second,
CASE
WHEN MAX(C_Task_end_time) IS NULL THEN MAX(Running_Task_start_time)
ELSE MAX(C_Task_start_time)
END AS Task_start_time,
MAX(C_Task_end_time) AS Task_end_time,
CASE
WHEN MAX(C_Task_end_time) IS NOT NULL THEN Task_run_status
ELSE 'Running'
END AS Task_Status,
ROW_NUMBER() OVER (PARTITION BY Dag_scheduled_date_run, Dag_scheduled_start_time, DAG_Name, task_name, task_run_status ORDER BY MAX(C_Task_start_time) DESC) AS RunRank
FROM TaskInfo
GROUP BY Dag_scheduled_date_run, Dag_scheduled_start_time, DAG_Name, Task_name, task_run_status)
,
LatestTaskDurations AS (
SELECT
Dag_scheduled_date_run,
Dag_scheduled_start_time,
DAG_Name,
Task_Name,
Task_Duration_in_Second,
Task_start_time,
Task_end_time,
Task_Status
FROM RankedTaskDurations
WHERE RunRank = 1
) ,
DAGDurations AS (
SELECT
Dag_scheduled_date_run,
Dag_scheduled_start_time,
DAG_Name,
MAX(C_Task_end_time) AS Max_End_Time,
MIN(C_Task_start_time) AS Min_Start_Time,
TIMESTAMP_DIFF(MAX(C_Task_end_time), MIN(C_Task_start_time), SECOND) AS DAG_Total_Time,
FROM TaskInfo
GROUP BY Dag_scheduled_date_run,Dag_scheduled_start_time,DAG_Name
),
SummarizedResult as (
SELECT
T.Dag_scheduled_date_run,
T.Dag_scheduled_start_time,
T.DAG_Name,
T.Task_Name,
T.Task_start_time,
T.Task_end_time,
T.Task_Duration_in_Second,
D.DAG_Total_Time,
CONCAT(
FLOOR(T.Task_Duration_in_second / 3600), 'h ',
FLOOR(MOD(T.Task_Duration_in_second, 3600) / 60), 'm ',
MOD(T.Task_Duration_in_second, 60), 's'
) AS Task_Duration_Formatted,
CONCAT(
FLOOR(D.DAG_Total_Time / 3600), 'h ',
FLOOR(MOD(D.DAG_Total_Time, 3600) / 60), 'm ',
MOD(D.DAG_Total_Time, 60), 's'
) AS DAG_Total_Time_Formatted,
T.Task_Status,
CASE
WHEN MAX(CASE WHEN T.Task_Status = 'FAILED' THEN 1 ELSE 0 END) OVER (PARTITION BY T.Dag_scheduled_start_time, T.DAG_Name) = 1 THEN 'FAILED'
WHEN MAX(CASE WHEN T.Task_Status = 'Running' THEN 1 ELSE 0 END) OVER (PARTITION BY T.Dag_scheduled_start_time, T.DAG_Name) = 1 THEN 'Running'
ELSE 'SUCCESS'
END AS DAG_Status
FROM LatestTaskDurations T
JOIN DAGDurations D ON T.Dag_scheduled_date_run = D.Dag_scheduled_date_run
AND T.Dag_scheduled_start_time = D.Dag_scheduled_start_time
AND T.DAG_Name = D.DAG_Name
ORDER BY T.Dag_scheduled_date_run DESC, t.Dag_scheduled_start_time, T.DAG_Name, T.task_name
)
select
A.Dag_scheduled_date_run,
A.Dag_scheduled_start_time,
A.DAG_Name,
A.Task_Name,
A.Task_start_time,
A.Task_end_time,
A.Task_Duration_in_Second,
A.Task_Duration_Formatted,
A.Task_Status,
A.DAG_Total_Time,
A.DAG_Total_Time_Formatted,
A.DAG_Status
from SummarizedResult A
order by A.Dag_scheduled_date_run DESC, A.Dag_scheduled_start_time, A.DAG_Name, A.task_name;
Let me know if this helps you and feel free to add any feedback or suggestions.
Upvotes: 0
Reputation: 1401
To complement the answer of user Bas Harenslak
. There are these options also that you can explore:
recap-dag
) which will be referenced by your DAGs to populate the record into your destination dataset.trigger_recap_dag = TriggerDagRunOperator(
task_id="trigger_recap_dag",
trigger_dag_id="recap-dag",
wait_for_completion=False,
allowed_states=['success']
conf='{"Time": datetime.now() ,"DAG": "recap-dag","Status":"success"}'
)
ingestion >> transformation >> save >> send_notification >> trigger_recap_dag
recap-dag
can also be independent and only run every hour/day/week of your election and checks your DAGs status.with DAG(
'recap-dag',
schedule_interval='@daily',
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
...
# Airflow >= 2.0.0
# Inside a python Operator
def GetRunningDagsInfo():
dag_runs = DagRun.find(
dag_id=your_dag_id,
execution_start_date=your_start_date
execution_end_date=your_end_date
)
...
After you dag (or dags) complete, it will fire the trigger dag. this
recap-dag
will saves your dag records into a custom table or file and then your independent DAG runs and retrieves the datasets that have been created so far and push the data into your BigQuery Table.
Upvotes: 0
Reputation: 3034
If you need the data in real-time, I would go with somethign along the lines of the approach @Bas has suggested, maybe with firestore or Cloud SQL. However note his comments on the inserts per day if you go with BigQuery.
If you can wait on the results on a daily basis you can do a log sink to BigQuery as described here: https://cloud.google.com/bigquery/docs/reference/auditlogs#stackdriver_logging_exports
In the filter criteria you can either bring in all of the Airflow logs or just the ones from the worker/scheduler.
Ex criteria:
resource.type="cloud_composer_environment"
logName="projects/{YOUR-PROJECT}/logs/airflow-worker"
In the log textPayload you will see something like:
Marking task as SUCCESS. dag_id=thing, task_id=stuff, executiondate=20220307T111111, start_date=20220307T114858, end_date=20220307T114859
You can then parse for what you need in BigQuery
Upvotes: 0
Reputation: 3084
There's no native out-of-the-box method to achieve this in Airflow. However, you could implement a function yourself which writes data to BigQuery and run it via a DAG's on_success_callback
and on_failure_callback
methods.
Note: BigQuery is not a transactional database and has limits on the number of inserts per day. For a large number of DAG runs, you might want to think of writing results in batches to BigQuery.
Upvotes: 1