Reputation: 33
I have a Airflow DAG on GCP composer that runs every 5 minutes. I would like to create a BigQuery table that will have the time when DAG starts to run and a flag identifying whether it's a successful run or failed run. For example, if the DAG runs at 2020-03-23 02:30 and the run fails, the BigQuery table will have time column with 2020-03-23 02:30 and flag column with 1. If it's a successful run, then the table will have time column with 2020-03-23 02:30 and flag column with 0. The table will append new rows.
Thanks in advance
Upvotes: 0
Views: 1021
Reputation: 33
Based on the solution by @Enrique, Here is my final solution.
def status_check(**kwargs):
dag_id = 'dag_id'
dag_runs = DagRun.find(dag_id=dag_id)
import pandas as pd
import pandas_gbq
from google.cloud import bigquery
arr = []
arr1 = []
for dag_run in dag_runs:
arr.append(dag_run.state)
arr1.append(dag_run.execution_date)
data1 = {'dag_status': arr, 'time': arr1}
df = pd.DataFrame(data1)
project_name = "project_name"
dataset = "Dataset"
outputBQtableName = '{}'.format(dataset)+'.dag_status_tab'
df.to_gbq(outputBQtableName, project_id=project_name, if_exists='replace', progress_bar=False, \
table_schema= \
[{'name': 'dag_status', 'type': 'STRING'}, \
{'name': 'time', 'type': 'TIMESTAMP'}])
return None
Dag_status = PythonOperator(
task_id='Dag_status',
python_callable=status_check,
)
Upvotes: 1
Reputation: 835
You can list_dag_runs CLI to list the DAG runs for a given dag_id. The information returned includes the state of each run.
Another option is retrieving the information via python code a few different ways. One such way that I've used in the past is the 'find' method in airflow.models.dagrun.DagRun.
dag_id = 'my_dag'
dag_runs = DagRun.find(dag_id=dag_id)
for dag_run in dag_runs:
print(dag_run.state)
Finally, use the BigQuery operator to write the DAg information into a BigQuery table. You can find an example of how to use the BigQueryOperator here.
Upvotes: 1