Reputation: 7947
I am trying to fetch results from BigQueryOperator
using airflow but I could not find a way to do it. I tried calling the next()
method in the bq_cursor
member (available in 1.10) however it returns None
. This is how I tried to do it
import datetime
import logging
from airflow import models
from airflow.contrib.operators import bigquery_operator
from airflow.operators import python_operator
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time()
)
def MyChequer(**kwargs):
big_query_count = bigquery_operator.BigQueryOperator(
task_id='my_bq_query',
sql='select count(*) from mydataset.mytable'
)
big_query_count.execute(context=kwargs)
logging.info(big_query_count)
logging.info(big_query_count.__dict__)
logging.info(big_query_count.bq_cursor.next())
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'project_id': 'myproject'
}
with models.DAG(
'bigquery_results_execution',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
myoperator = python_operator.PythonOperator(
task_id='threshold_operator',
provide_context=True,
python_callable=MyChequer
)
# Define DAG
myoperator
Taking a look to bigquery_hook.py and bigquery_operator.py it seems to be the only available way to fetch the results.
Upvotes: 6
Views: 28224
Reputation: 6822
The Google provided operators use BigQueryHook
to get an authenticated connection to BigQuery. That class is the one that resolves the Airflow Connection and creates the Google Cloud credentials. You import it with:
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
The latest docs say that it has a method "get_client()" that should return the authenticated underlying client. That didn't work on my version of Airflow so I used this answer to directly create a bigquery.client.Client
connection from the internal fields of the hook. Here is the logic:
def read_files_loaded_from_bq(bigquery_conn_id, sql):
hook = BigQueryHook(bigquery_conn_id=bigquery_conn_id, delegate_to=None, use_legacy_sql=False)
# this should work with latest version:
# client = hook.get_client()
# instead directly create a client using the internals of the hook:
client = bigquery.Client(project=hook._get_field("project"), credentials=hook._get_credentials())
query_job = client.query(sql)
files = []
for row in query_job:
file_name = row["file_name"]
files.append(file_name)
result = ','.join(files)
print('found files:', result)
return result
read_files_loaded = PythonOperator(
task_id='read_files_loaded',
provide_context=False,
python_callable=read_files_loaded_from_bq,
op_kwargs = {
"bigquery_conn_id": BIGQUERY_CONN_ID,
"sql": "select file_name from myproj.mydataset.files_loaded"
},
dag=dag
)
That would run the query and load the data into a string. You can then use the following to read the results using xcom:
"{{ task_instance.xcom_pull(task_ids='read_files_loaded') }}"
This approach seems straight forward if you are just loading a small amount of meta-data to drive the logic of the DAG. If there was any serious data it would probably be better to use the operators move data between tables and buckets without every pulling it into the actual airflow task process.
Upvotes: 2
Reputation: 7947
Thanks to @kaxil and @Mike for their answers. I found the problem. There is a kind of bug (in my mind) in the BigQueryCursor
. As part of the run_with_configuration
, the running_job_id
is being returned but never assigned to job_id
which is used to pull the results in the next
method. A workaround (not really elegant but good if you do not want to re-implement everything), is assign the job_id
based on the running_job_id
in the hook like this
big_query_count.execute(context=kwargs)
#workaround
big_query_count.bq_cursor.job_id = big_query_count.bq_cursor.running_job_id
logging.info(big_query_count.bq_cursor.next())
Upvotes: 2
Reputation: 1366
Sharing a small example on how to use Big query hook to fetch data :
Below is my demo_bigquery_hook file :
from airflow import DAG
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from datetime import *
import logging
logger = logging.getLogger("airflow.task")
# default arguments
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': days_ago(0),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=2)
}
# initializing dag
dag = DAG(
'test_bigquery_hook',
default_args=default_args,
catchup=False,
schedule_interval=None,
max_active_runs=1
)
def get_data_from_bq(**kwargs):
hook = BigQueryHook(bigquery_conn_id='bigquery_default', delegate_to=None, use_legacy_sql=False)
conn = hook.get_conn()
cursor = conn.cursor()
cursor.execute('SELECT owner_display_name, title, view_count FROM `bigquery-public-data.stackoverflow.posts_questions` WHERE creation_date > "2020-09-09" ORDER BY view_count DESC LIMIT 2')
result = cursor.fetchall()
print('result', result)
return result
fetch_data = PythonOperator(
task_id='fetch_data_public_dataset',
provide_context=True,
python_callable=get_data_from_bq,
dag=dag
)
fetch_data
To test it on local, save the above content in a 'demo_bigquery_hook.py' file and copy it to your dags folder. Open a command promt and execute below commands :
export AIRFLOW_CONN_BIGQUERY_DEFAULT="google-cloud-platform://?extra__google_cloud_platform__project=<gcp_project_Id>"
. Replace gcp_project_id is with the any Gcp project Id. This command will setup the default gcp account.
export GOOGLE_APPLICATION_CREDENTIALS=<path_to_your_sa_key>
. where <path_to_your_sa_key> is the path to your Gcp project's service account key.
Finally run below command : airflow test test_bigquery_hook fetch_data_public_dataset 2020-09-02T05:38:00+00:00
. Upon running you will see the below result.
A few points to note:
Query used in this example fetches results from a public dataset provided by Gcp.
Bigquery cursor object provides a whole lot of other functions as well. Open this link to go through them.
Airflow version used for this example is 1.10.12.
Upvotes: 3
Reputation: 2614
I create my own operator using the BigQuery hook whenever I need to get the data from a BigQuery query and use it for something.I usually call this a BigQueryToXOperator and we have a bunch of these for sending BigQuery data to other internal systems.
For example, I have a BigQueryToPubSub operator that you might find useful as an example for how to query BigQuery and then handle the results on a row by row basis, sending them to Google PubSub. Consider the following generalized sample code for how to do this on your own:
class BigQueryToXOperator(BaseOperator):
template_fields = ['sql']
ui_color = '#000000'
@apply_defaults
def __init__(
self,
sql,
keys,
bigquery_conn_id='bigquery_default',
delegate_to=None,
*args,
**kwargs):
super(BigQueryToXOperator, self).__init__(*args, **kwargs)
self.sql = sql
self.keys = keys # A list of keys for the columns in the result set of sql
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to
def execute(self, context):
"""
Run query and handle results row by row.
"""
cursor = self._query_bigquery()
for row in cursor.fetchall():
# Zip keys and row together because the cursor returns a list of list (not list of dicts)
row_dict = dumps(dict(zip(self.keys,row))).encode('utf-8')
# Do what you want with the row...
handle_row(row_dict)
def _query_bigquery(self):
"""
Queries BigQuery and returns a cursor to the results.
"""
bq = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
use_legacy_sql=False)
conn = bq.get_conn()
cursor = conn.cursor()
cursor.execute(self.sql)
return cursor
Upvotes: 7
Reputation: 18844
You can use BigQueryOperator
to save results in a temporary destination table and then use BigQueryGetDataOperator
to fetch the results as below and then use BigQueryTableDeleteOperator
to delete the table:
get_data = BigQueryGetDataOperator(
task_id='get_data_from_bq',
dataset_id='test_dataset',
table_id='Transaction_partitions',
max_results='100',
selected_fields='DATE',
bigquery_conn_id='airflow-service-account'
)
Docs:
Upvotes: 4