Dzhan
Dzhan

Reputation: 13

"AttributeError: 'GoogleBaseHook' object has no attribute 'get_records'" Error while trying to run Custom SqlSensor for BigQuery

I am migrating some DAG's from "Cloud Composer-1 Airflow-1" to Cloud "Composer-2 Airflow-2".

I have created a custom module called BigQuerySqlSensor acting like a SqlSensor for BigQuery.

I have done this by creating another custom module called MySqlSensor because Airflow has removed some of the supported attributes for BaseHook and recommended to use DbApiHook instead.

I have basically used the same source code of SqlSensor (can be found here) and replaced BaseHook module with DbApiHook.

Then I used the MySqlSensor to create custom BigQuerySqlSensor like below:

class BigQuerySqlSensor(MySqlSensor):
    def _get_hook(self):
        hook = super()._get_hook()
        hook.use_legacy_sql = False
        return hook

The task where I am calling the BigQuerySqlSensor is:

task = BigQuerySqlSensor(
        task_id='my_task',
        conn_id='bigquery_default',
        timeout= <Some Numbers>
        poke_interval= <Some Numbers>
        mode='reschedule',
        sql='my_sql.sql'
    )

However, I am getting an error such as:

        records = hook.get_records(self.sql, self.parameters)
AttributeError: 'GoogleBaseHook' object has no attribute 'get_records'

I am not sure where GoogleBaseHook is being picked up. I am guessing it is something to do with the connection in Airflow.

In my previous Cloud Composer instance, I am running the same code with same configurations and connections. I am out of ideas at this point. Any ideas would be appreciated!

Upvotes: 1

Views: 2463

Answers (2)

yingw
yingw

Reputation: 307

To give a bit more details to @mrk's answer. Starting with airflow 2.4.0 SqlSensor has been moved into a new airflow.providers.common package so would want to check that you're doing from airflow.providers.common.sql.sensors.sql import SqlSensor rather than from airflow.sensors.sql import SqlSensor

For GCP, there can be multiple connection types. You'd want to make sure that your connection type is 'gcpbigquery' (since there can be multiple sql products on gcp to use sensor for)

You can check your connection type with

from airflow.hooks.base import BaseHook
c = BaseHook.get_connection('bigquery_default') # name of connection
c.conn_type # should be gcpbigquery and not google-cloud-platform

You'll can find connection type in env specified something like AIRFLOW_CONN_BIGQUERY_DEFAULT='gcpbigquery://' or somewhere in secrets

Upvotes: 0

mrk
mrk

Reputation: 284

The problem is you are using GoogleBaseHook instead of BigQueryHook (probably your connection type is google_cloud_platform).

It seems it can be solved by defining gcpbigquery connection - but base SqlSensor class checks type of hook and doesn't allow it (it looks like airflow bug - and it seems it is already solved on airflow main branch)

I solved it by defining my own sensor class this way:

class BigQuerySqlSensor(SqlSensor):
    def _get_hook(self):
        return BigQueryHook(gcp_conn_id=self.conn_id, use_legacy_sql=False, location="us")

(conn_id is identifier of google_cloud_platform connection). This way you can also add additional hook parameters - it seems at least 'location' is required.

It works for me on Airflow 2.2.3

Upvotes: 2

Related Questions