Sara
Sara

Reputation: 43

Execution of multiple sql statements in airflow using execute_string

I need to run 2 sql statements in the same session in Snowflake. So I am trying to use execute_string connector to achieve this in Airflow

My snowflake_hook.py looks like below:

from airflow.hooks.base_hook import BaseHook
class SnowflakeHook(BaseHook):
    

    def __init__(self, sf_conn_id, warehouse=None, source=None):
        """Snowflake hook init.

        Arguments:
        sf_conn_id: Airflow connection ID providing Snowflake credential.
        warehouse: Warehouse override.  Defaults to "warehouse" key in the connection extra section.
        source: Hook source to pass to the parent class constructor.
        """
        import snowflake.connector

        super().__init__(source)
        tmp_conn = self.get_connection(sf_conn_id)
        self.user = tmp_conn.login
        self.password = tmp_conn.password
        extras = tmp_conn.extra_dejson
        self.account = extras.get("account", None)
        self.warehouse = warehouse if warehouse is not None else extras.get("warehouse", None)
        self.database = extras.get("database", None)
        self.schema = tmp_conn.schema
        self.role=extras.get("role", None)
        self.conn = snowflake.connector.connect(
            user=self.user,
            password=self.password,
            account=self.account,
            warehouse=self.warehouse,
            database=self.database,
            schema=self.schema,
            role=self.role,
        )
    
    def execute(self, qry):
        """Execute a sql statement"""
        cs = self.conn.cursor()
        try:
            cs.execute(qry)
            return cs.fetchall()
        finally:
            cs.close()

#Added this part in snowflake_hook

def execute_string(self, context):
        """Executes one or more SQL statements separated by semi-colons"""
        cs = self.conn.cursor()
        try:
            cs.execute_string(context)
            return cs.fetchall()
        finally:
            cs.close()

My airflow dag using this to execute sql statements as below:

def delete(sf_conn_id):

        from custom.hooks.snowflake_hook import SnowflakeHook
        table_list_sql = """
                        create temporary table dim_expired  
                            as select * from expired_insert_stream; 
                        select distinct id  
                          from dim_expired
                        """
                        
        sf = SnowflakeHook(sf_conn_id)
        print(table_list_sql)
        res = sf.execute_string(table_list_sql)

This is failing with an error "UnboundLocalError: local variable 'sf' referenced before assignment"

Upvotes: 0

Views: 2057

Answers (1)

TJaniF
TJaniF

Reputation: 1046

Is there a reason you are not using the SnowflakeOperator?

For future reference executing several SQL statements is possible with that operator without writing a custom hook:

from airflow.decorators import dag
from pendulum import datetime
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

@dag(
    dag_id="snowflake_basic",
    start_date=datetime(2022,12,10),
    schedule="@daily",
    catchup=False,
)
def snowflake_basic():

    t1 = SnowflakeOperator(
        task_id="t1",
        snowflake_conn_id="snowflake_conn",
        sql="""create temporary table temp_table_4  
            as select * from table_2; 
            select DISTINCT(customer_id)  
            from temp_table_4;"""
    )

snowflake_basic()

You can also provide your statements in a SQL file. Tested with Airflow 2.5.0 and Snowflake provider 4.0.2.

Upvotes: 3

Related Questions