Reputation: 13
I am migrating some DAG's from "Cloud Composer-1 Airflow-1" to Cloud "Composer-2 Airflow-2".
I have created a custom module called BigQuerySqlSensor
acting like a SqlSensor
for BigQuery.
I have done this by creating another custom module called MySqlSensor
because Airflow has removed some of the supported attributes for BaseHook
and recommended to use DbApiHook
instead.
I have basically used the same source code of SqlSensor (can be found here) and replaced BaseHook
module with DbApiHook
.
Then I used the MySqlSensor
to create custom BigQuerySqlSensor
like below:
class BigQuerySqlSensor(MySqlSensor):
def _get_hook(self):
hook = super()._get_hook()
hook.use_legacy_sql = False
return hook
The task where I am calling the BigQuerySqlSensor
is:
task = BigQuerySqlSensor(
task_id='my_task',
conn_id='bigquery_default',
timeout= <Some Numbers>
poke_interval= <Some Numbers>
mode='reschedule',
sql='my_sql.sql'
)
However, I am getting an error such as:
records = hook.get_records(self.sql, self.parameters)
AttributeError: 'GoogleBaseHook' object has no attribute 'get_records'
I am not sure where GoogleBaseHook
is being picked up. I am guessing it is something to do with the connection in Airflow.
In my previous Cloud Composer instance, I am running the same code with same configurations and connections. I am out of ideas at this point. Any ideas would be appreciated!
Upvotes: 1
Views: 2463
Reputation: 307
To give a bit more details to @mrk's answer. Starting with airflow 2.4.0 SqlSensor
has been moved into a new airflow.providers.common
package so would want to check that you're doing from airflow.providers.common.sql.sensors.sql import SqlSensor
rather than from airflow.sensors.sql import SqlSensor
For GCP, there can be multiple connection types. You'd want to make sure that your connection type is 'gcpbigquery' (since there can be multiple sql products on gcp to use sensor for)
You can check your connection type with
from airflow.hooks.base import BaseHook
c = BaseHook.get_connection('bigquery_default') # name of connection
c.conn_type # should be gcpbigquery and not google-cloud-platform
You'll can find connection type in env specified something like AIRFLOW_CONN_BIGQUERY_DEFAULT='gcpbigquery://'
or somewhere in secrets
Upvotes: 0
Reputation: 284
The problem is you are using GoogleBaseHook
instead of BigQueryHook (probably your connection type is google_cloud_platform
).
It seems it can be solved by defining gcpbigquery
connection - but base SqlSensor
class checks type of hook and doesn't allow it (it looks like airflow bug - and it seems it is already solved on airflow main
branch)
I solved it by defining my own sensor class this way:
class BigQuerySqlSensor(SqlSensor):
def _get_hook(self):
return BigQueryHook(gcp_conn_id=self.conn_id, use_legacy_sql=False, location="us")
(conn_id is identifier of google_cloud_platform
connection). This way you can also add additional hook parameters - it seems at least 'location' is required.
It works for me on Airflow 2.2.3
Upvotes: 2