Srule
Srule

Reputation: 460

How to query Google Big Query in Apache Airflow and return results as a Pandas Dataframe?

I'm trying to save a bigquery query to a dataframe in a custom Airflow operator.

I've tried using the airflow.contrib.hooks.bigquery_hook and the get_pandas_df method. The task get's stuck on authentication, as it wants me to manually visit a url to authenticate.

As a result, I'm hard coding in authentication. This works, but is definitely not ideal.

Working but not ideal (credentials are hard coded):

def execute(self, context):
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'my-file-location.json'
        client = bigquery.Client()

        job_config = bigquery.QueryJobConfig()

        df = client.query(
            self.query,
            location="US",
            job_config=job_config,).to_dataframe()

Not working:

def execute(self, context):
    bq  = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, delegate_to=None,use_legacy_sql=True, location='US')
    df = bq.get_pandas_df(self.query)

This code get's stuck authenticating. Here is the log: [2019-06-19 12:56:05,526] {logging_mixin.py:95} INFO - Please visit this URL to authorize this application.

Upvotes: 6

Views: 11939

Answers (4)

AbdelRahman Abbas
AbdelRahman Abbas

Reputation: 3

I think this can solve your problem using the BigQueryHook you can put it inside any python operator task, it works for me:

from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from pandas import DataFrame


## inside the operator
your_conn_id = 'your_connection_id'
your_sql     = 'sql_to_do_in_bq'
bq_hook = BigQueryHook(bigquery_conn_id=your_conn_id, delegate_to=None, use_legacy_sql=False)
conn = bq_hook.get_conn()
cursor = conn.cursor()
cursor.execute(your_sql)
df= DataFrame( cursor.fetchall() )

Upvotes: 0

duan
duan

Reputation: 333

Somehow I can't get BigQueryPandasConnector working. What I eventually end up with is using the credentials from BigQueryHook to create a normal bigquery.client.Client using BigQuery's official Python client.

Here's an example:

from google.cloud import bigquery

bq_hook = BigQueryHook(bigquery_conn_id=bigquery_conn_id, use_legacy_sql=False)
bq_client = bigquery.Client(project = bq_hook._get_field("project"), credentials = bq_hook._get_credentials())
df = bq_client.query(sql).to_dataframe()

Upvotes: 5

Tlaquetzal
Tlaquetzal

Reputation: 2850

Complementing the response of @Oluwafemi, now that you have the credentials for the BigQueryHook you can use them to instantiate the BigQueryPandasConnector. According to the documentation, this connector:

... allows Airflow to use BigQuery with Pandas without forcing a three legged OAuth connection ...

Here's an example:

def execute(self, context):

    bq = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, delegate_to=None,use_legacy_sql=True, location='US')
    pd = BigQueryPandasConnector(bq._get_field('project'), bq.get_service())
    df = pd.read_gbq(self.query)

Upvotes: 1

Oluwafemi Sule
Oluwafemi Sule

Reputation: 38922

It seems like no service account or key path is specified for the hook.

Here is a guide to setting up GCP connection. https://github.com/apache/airflow/blob/1.10.3/docs/howto/connection/gcp.rst

Set AIRFLOW_CONN_BIGQUERY_DEFAULT environment variable in your airflow config file.

You can go the way of using key_path query parameter if the credentials are available in a path accessible to your airflow process.
Otherwise, set key_dict query parameter to URL encoded JSON content of the credentials file.

AIRFLOW_CONN_BIGQUERY_DEFAULT=google-cloud-platform://?extra__google_cloud_platform__key_path=%2Fkeys%2Fkey.json&extra__google_cloud_platform__scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform&extra__google_cloud_platform__project=airflow&extra__google_cloud_platform__num_retries=5

Upvotes: 0

Related Questions