hlagos
hlagos

Reputation: 7947

Fetch results from BigQueryOperator in airflow

I am trying to fetch results from BigQueryOperator using airflow but I could not find a way to do it. I tried calling the next() method in the bq_cursor member (available in 1.10) however it returns None. This is how I tried to do it

import datetime
import logging

from airflow import models
from airflow.contrib.operators import bigquery_operator
from airflow.operators import python_operator


yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time()
)

def MyChequer(**kwargs):
    big_query_count = bigquery_operator.BigQueryOperator(
        task_id='my_bq_query',
        sql='select count(*) from mydataset.mytable'
    )

    big_query_count.execute(context=kwargs)

    logging.info(big_query_count)
    logging.info(big_query_count.__dict__)
    logging.info(big_query_count.bq_cursor.next())

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'project_id': 'myproject'
}

with models.DAG(
        'bigquery_results_execution',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    myoperator = python_operator.PythonOperator(
        task_id='threshold_operator',
        provide_context=True,
        python_callable=MyChequer
    )

    # Define DAG
    myoperator

Taking a look to bigquery_hook.py and bigquery_operator.py it seems to be the only available way to fetch the results.

Upvotes: 6

Views: 28224

Answers (5)

simbo1905
simbo1905

Reputation: 6822

The Google provided operators use BigQueryHook to get an authenticated connection to BigQuery. That class is the one that resolves the Airflow Connection and creates the Google Cloud credentials. You import it with:

from airflow.contrib.hooks.bigquery_hook import BigQueryHook

The latest docs say that it has a method "get_client()" that should return the authenticated underlying client. That didn't work on my version of Airflow so I used this answer to directly create a bigquery.client.Client connection from the internal fields of the hook. Here is the logic:

def read_files_loaded_from_bq(bigquery_conn_id, sql):
    hook = BigQueryHook(bigquery_conn_id=bigquery_conn_id, delegate_to=None, use_legacy_sql=False)
    # this should work with latest version:
    # client = hook.get_client()
    # instead directly create a client using the internals of the hook:
    client = bigquery.Client(project=hook._get_field("project"), credentials=hook._get_credentials())
    query_job = client.query(sql)
    files = []
    for row in query_job:
        file_name = row["file_name"]
        files.append(file_name)
    result = ','.join(files)
    print('found files:', result)
    return result

read_files_loaded = PythonOperator(
    task_id='read_files_loaded',
    provide_context=False,
    python_callable=read_files_loaded_from_bq,
    op_kwargs = {
        "bigquery_conn_id": BIGQUERY_CONN_ID,
        "sql": "select file_name from myproj.mydataset.files_loaded"
    },
    dag=dag
)

That would run the query and load the data into a string. You can then use the following to read the results using xcom:

"{{ task_instance.xcom_pull(task_ids='read_files_loaded') }}"

This approach seems straight forward if you are just loading a small amount of meta-data to drive the logic of the DAG. If there was any serious data it would probably be better to use the operators move data between tables and buckets without every pulling it into the actual airflow task process.

Upvotes: 2

hlagos
hlagos

Reputation: 7947

Thanks to @kaxil and @Mike for their answers. I found the problem. There is a kind of bug (in my mind) in the BigQueryCursor. As part of the run_with_configuration, the running_job_id is being returned but never assigned to job_id which is used to pull the results in the next method. A workaround (not really elegant but good if you do not want to re-implement everything), is assign the job_id based on the running_job_id in the hook like this

big_query_count.execute(context=kwargs)
#workaround
big_query_count.bq_cursor.job_id = big_query_count.bq_cursor.running_job_id
logging.info(big_query_count.bq_cursor.next())

Upvotes: 2

Sharing a small example on how to use Big query hook to fetch data :

Below is my demo_bigquery_hook file :

from airflow import DAG
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from datetime import *
import logging

logger = logging.getLogger("airflow.task")

# default arguments
default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': days_ago(0),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=2)
}

# initializing dag
dag = DAG(
    'test_bigquery_hook',
    default_args=default_args,
    catchup=False,
    schedule_interval=None,
    max_active_runs=1
)

def get_data_from_bq(**kwargs):
    hook = BigQueryHook(bigquery_conn_id='bigquery_default', delegate_to=None, use_legacy_sql=False)
    conn = hook.get_conn()
    cursor = conn.cursor()
    cursor.execute('SELECT owner_display_name, title, view_count FROM `bigquery-public-data.stackoverflow.posts_questions` WHERE creation_date > "2020-09-09" ORDER BY view_count DESC LIMIT 2')
    result = cursor.fetchall()
    print('result', result)
    return result

fetch_data = PythonOperator(
    task_id='fetch_data_public_dataset',
    provide_context=True,
    python_callable=get_data_from_bq,
    dag=dag
)

fetch_data

To test it on local, save the above content in a 'demo_bigquery_hook.py' file and copy it to your dags folder. Open a command promt and execute below commands :

  1. export AIRFLOW_CONN_BIGQUERY_DEFAULT="google-cloud-platform://?extra__google_cloud_platform__project=<gcp_project_Id>". Replace gcp_project_id is with the any Gcp project Id. This command will setup the default gcp account.

  2. export GOOGLE_APPLICATION_CREDENTIALS=<path_to_your_sa_key>. where <path_to_your_sa_key> is the path to your Gcp project's service account key.

  3. Finally run below command : airflow test test_bigquery_hook fetch_data_public_dataset 2020-09-02T05:38:00+00:00. Upon running you will see the below result.

enter image description here

A few points to note:

  1. Query used in this example fetches results from a public dataset provided by Gcp.

  2. Bigquery cursor object provides a whole lot of other functions as well. Open this link to go through them.

  3. Airflow version used for this example is 1.10.12.

Upvotes: 3

Mike
Mike

Reputation: 2614

I create my own operator using the BigQuery hook whenever I need to get the data from a BigQuery query and use it for something.I usually call this a BigQueryToXOperator and we have a bunch of these for sending BigQuery data to other internal systems.

For example, I have a BigQueryToPubSub operator that you might find useful as an example for how to query BigQuery and then handle the results on a row by row basis, sending them to Google PubSub. Consider the following generalized sample code for how to do this on your own:

class BigQueryToXOperator(BaseOperator):
    template_fields = ['sql']
    ui_color = '#000000'

    @apply_defaults
    def __init__(
            self,
            sql,
            keys,
            bigquery_conn_id='bigquery_default',
            delegate_to=None,
            *args,
            **kwargs):
        super(BigQueryToXOperator, self).__init__(*args, **kwargs)
        self.sql = sql
        self.keys = keys # A list of keys for the columns in the result set of sql
        self.bigquery_conn_id = bigquery_conn_id
        self.delegate_to = delegate_to


    def execute(self, context):
        """
        Run query and handle results row by row.
        """
        cursor = self._query_bigquery()
        for row in cursor.fetchall():
            # Zip keys and row together because the cursor returns a list of list (not list of dicts)
            row_dict = dumps(dict(zip(self.keys,row))).encode('utf-8')

            # Do what you want with the row...
            handle_row(row_dict)


    def _query_bigquery(self):
        """
        Queries BigQuery and returns a cursor to the results.
        """
        bq = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                          use_legacy_sql=False)
        conn = bq.get_conn()
        cursor = conn.cursor()
        cursor.execute(self.sql)
        return cursor

Upvotes: 7

kaxil
kaxil

Reputation: 18844

You can use BigQueryOperator to save results in a temporary destination table and then use BigQueryGetDataOperator to fetch the results as below and then use BigQueryTableDeleteOperator to delete the table:

get_data = BigQueryGetDataOperator(
    task_id='get_data_from_bq',
    dataset_id='test_dataset',
    table_id='Transaction_partitions',
    max_results='100',
    selected_fields='DATE',
    bigquery_conn_id='airflow-service-account'
)

Docs:

Upvotes: 4

Related Questions