moeen
moeen

Reputation: 33

How to create a BigQuery table with Airflow failure notification?

I have a Airflow DAG on GCP composer that runs every 5 minutes. I would like to create a BigQuery table that will have the time when DAG starts to run and a flag identifying whether it's a successful run or failed run. For example, if the DAG runs at 2020-03-23 02:30 and the run fails, the BigQuery table will have time column with 2020-03-23 02:30 and flag column with 1. If it's a successful run, then the table will have time column with 2020-03-23 02:30 and flag column with 0. The table will append new rows.

Thanks in advance

Upvotes: 0

Views: 1021

Answers (2)

moeen
moeen

Reputation: 33

Based on the solution by @Enrique, Here is my final solution.

def status_check(**kwargs):

        dag_id = 'dag_id'
        dag_runs = DagRun.find(dag_id=dag_id)

        import pandas as pd
        import pandas_gbq
        from google.cloud import bigquery

        arr = []
        arr1 = []

        for dag_run in dag_runs:
            arr.append(dag_run.state)
            arr1.append(dag_run.execution_date)

        data1 = {'dag_status': arr, 'time': arr1}

        df = pd.DataFrame(data1)

        project_name = "project_name"
        dataset = "Dataset"

        outputBQtableName = '{}'.format(dataset)+'.dag_status_tab'

        df.to_gbq(outputBQtableName, project_id=project_name, if_exists='replace', progress_bar=False, \
            table_schema= \
                [{'name': 'dag_status', 'type': 'STRING'}, \
                 {'name': 'time', 'type': 'TIMESTAMP'}])

        return None


Dag_status = PythonOperator(
        task_id='Dag_status',
        python_callable=status_check,
    )

Upvotes: 1

Enrique Zetina
Enrique Zetina

Reputation: 835

You can list_dag_runs CLI to list the DAG runs for a given dag_id. The information returned includes the state of each run.

Another option is retrieving the information via python code a few different ways. One such way that I've used in the past is the 'find' method in airflow.models.dagrun.DagRun.

dag_id = 'my_dag'
dag_runs = DagRun.find(dag_id=dag_id)
for dag_run in dag_runs:
      print(dag_run.state)

Finally, use the BigQuery operator to write the DAg information into a BigQuery table. You can find an example of how to use the BigQueryOperator here.

Upvotes: 1

Related Questions