Reputation: 460
I'm trying to save a bigquery query to a dataframe in a custom Airflow operator.
I've tried using the airflow.contrib.hooks.bigquery_hook and the get_pandas_df method. The task get's stuck on authentication, as it wants me to manually visit a url to authenticate.
As a result, I'm hard coding in authentication. This works, but is definitely not ideal.
Working but not ideal (credentials are hard coded):
def execute(self, context):
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'my-file-location.json'
client = bigquery.Client()
job_config = bigquery.QueryJobConfig()
df = client.query(
self.query,
location="US",
job_config=job_config,).to_dataframe()
Not working:
def execute(self, context):
bq = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, delegate_to=None,use_legacy_sql=True, location='US')
df = bq.get_pandas_df(self.query)
This code get's stuck authenticating. Here is the log: [2019-06-19 12:56:05,526] {logging_mixin.py:95} INFO - Please visit this URL to authorize this application.
Upvotes: 6
Views: 11939
Reputation: 3
I think this can solve your problem using the BigQueryHook you can put it inside any python operator task, it works for me:
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from pandas import DataFrame
## inside the operator
your_conn_id = 'your_connection_id'
your_sql = 'sql_to_do_in_bq'
bq_hook = BigQueryHook(bigquery_conn_id=your_conn_id, delegate_to=None, use_legacy_sql=False)
conn = bq_hook.get_conn()
cursor = conn.cursor()
cursor.execute(your_sql)
df= DataFrame( cursor.fetchall() )
Upvotes: 0
Reputation: 333
Somehow I can't get BigQueryPandasConnector working. What I eventually end up with is using the credentials from BigQueryHook to create a normal bigquery.client.Client using BigQuery's official Python client.
Here's an example:
from google.cloud import bigquery
bq_hook = BigQueryHook(bigquery_conn_id=bigquery_conn_id, use_legacy_sql=False)
bq_client = bigquery.Client(project = bq_hook._get_field("project"), credentials = bq_hook._get_credentials())
df = bq_client.query(sql).to_dataframe()
Upvotes: 5
Reputation: 2850
Complementing the response of @Oluwafemi, now that you have the credentials for the BigQueryHook you can use them to instantiate the BigQueryPandasConnector. According to the documentation, this connector:
... allows Airflow to use BigQuery with Pandas without forcing a three legged OAuth connection ...
Here's an example:
def execute(self, context):
bq = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, delegate_to=None,use_legacy_sql=True, location='US')
pd = BigQueryPandasConnector(bq._get_field('project'), bq.get_service())
df = pd.read_gbq(self.query)
Upvotes: 1
Reputation: 38922
It seems like no service account or key path is specified for the hook.
Here is a guide to setting up GCP connection. https://github.com/apache/airflow/blob/1.10.3/docs/howto/connection/gcp.rst
Set AIRFLOW_CONN_BIGQUERY_DEFAULT
environment variable in your airflow config file.
You can go the way of using key_path
query parameter if the credentials are available in a path accessible to your airflow process.
Otherwise, set key_dict
query parameter to URL encoded JSON content of the credentials file.
AIRFLOW_CONN_BIGQUERY_DEFAULT=google-cloud-platform://?extra__google_cloud_platform__key_path=%2Fkeys%2Fkey.json&extra__google_cloud_platform__scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform&extra__google_cloud_platform__project=airflow&extra__google_cloud_platform__num_retries=5
Upvotes: 0