Reputation: 43
I need to run 2 sql statements in the same session in Snowflake. So I am trying to use execute_string connector to achieve this in Airflow
My snowflake_hook.py looks like below:
from airflow.hooks.base_hook import BaseHook
class SnowflakeHook(BaseHook):
def __init__(self, sf_conn_id, warehouse=None, source=None):
"""Snowflake hook init.
Arguments:
sf_conn_id: Airflow connection ID providing Snowflake credential.
warehouse: Warehouse override. Defaults to "warehouse" key in the connection extra section.
source: Hook source to pass to the parent class constructor.
"""
import snowflake.connector
super().__init__(source)
tmp_conn = self.get_connection(sf_conn_id)
self.user = tmp_conn.login
self.password = tmp_conn.password
extras = tmp_conn.extra_dejson
self.account = extras.get("account", None)
self.warehouse = warehouse if warehouse is not None else extras.get("warehouse", None)
self.database = extras.get("database", None)
self.schema = tmp_conn.schema
self.role=extras.get("role", None)
self.conn = snowflake.connector.connect(
user=self.user,
password=self.password,
account=self.account,
warehouse=self.warehouse,
database=self.database,
schema=self.schema,
role=self.role,
)
def execute(self, qry):
"""Execute a sql statement"""
cs = self.conn.cursor()
try:
cs.execute(qry)
return cs.fetchall()
finally:
cs.close()
#Added this part in snowflake_hook
def execute_string(self, context):
"""Executes one or more SQL statements separated by semi-colons"""
cs = self.conn.cursor()
try:
cs.execute_string(context)
return cs.fetchall()
finally:
cs.close()
My airflow dag using this to execute sql statements as below:
def delete(sf_conn_id):
from custom.hooks.snowflake_hook import SnowflakeHook
table_list_sql = """
create temporary table dim_expired
as select * from expired_insert_stream;
select distinct id
from dim_expired
"""
sf = SnowflakeHook(sf_conn_id)
print(table_list_sql)
res = sf.execute_string(table_list_sql)
This is failing with an error "UnboundLocalError: local variable 'sf' referenced before assignment"
Upvotes: 0
Views: 2057
Reputation: 1046
Is there a reason you are not using the SnowflakeOperator?
For future reference executing several SQL statements is possible with that operator without writing a custom hook:
from airflow.decorators import dag
from pendulum import datetime
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
@dag(
dag_id="snowflake_basic",
start_date=datetime(2022,12,10),
schedule="@daily",
catchup=False,
)
def snowflake_basic():
t1 = SnowflakeOperator(
task_id="t1",
snowflake_conn_id="snowflake_conn",
sql="""create temporary table temp_table_4
as select * from table_2;
select DISTINCT(customer_id)
from temp_table_4;"""
)
snowflake_basic()
You can also provide your statements in a SQL file. Tested with Airflow 2.5.0 and Snowflake provider 4.0.2.
Upvotes: 3