Vova
Vova

Reputation: 599

Python SnowflakeOperator setup snowflake_default

Good day, I cannot find how to do basic setup to airflow.contrib.operators.snowflake_operator.SnowflakeOperatorto connect to snowflake. snowflake.connector.connect works fine.

When I do it with SnowflakeOperator :

op = snowflake_operator.SnowflakeOperator(sql = "create table test(*****)", task_id = '123')

I get the

airflow.exceptions.AirflowException: The conn_idsnowflake_defaultisn't defined

I tried to insert in backend sqlite db

INSERT INTO connection( conn_id, conn_type, host , schema, login, password , port, is_encrypted, is_extra_encrypted ) VALUES (*****)

But after it I get an error:

snowflake.connector.errors.ProgrammingError: 251001: None: Account must be specified.

Passing account kwarg into SnowflakeOperator constructor does not help. It seems I cannot pass account into db or into constructor, but it's required.

Please help me, let me know what data I should insert into backend local db to be able to connect via SnowflakeOperator

Upvotes: 2

Views: 3999

Answers (2)

TPPZ
TPPZ

Reputation: 4891

With this context:

$ airflow version
2.2.3
$ pip install snowflake-connector-python==2.4.1
$ pip install apache-airflow-providers-snowflake==2.5.0

You have to specify the Snowflake Account and Snowflake Region twice like this:

airflow connections add 'my_snowflake_db' \
    --conn-type 'snowflake' \
    --conn-login 'my_user' \
    --conn-password 'my_password' \
    --conn-port 443 \
    --conn-schema 'public' \
    --conn-host 'my_account_xyz.my_region_abc.snowflakecomputing.com' \
    --conn-extra '{ "account": "my_account_xyz", "warehouse": "my_warehouse", "region": "my_region_abc" }'

Otherwise it doesn't work throwing the Python exception:

snowflake.connector.errors.ProgrammingError: 251001: 251001: Account must be specified

I think this might be due to that airflow command parameter --conn-host that is expecting a full domain with subdomain (the my_account_xyz.my_region_abc), that usually for Snowflake are specified as query parameters in a way similar to this template (although I did not check all the combinations of the command airflow connections add and the DAG execution):

"snowflake://{user}:{password}@{account}{region}{cloud}/{database}/{schema}?role={role}&warehouse={warehouse}&timezone={timezone}"

Then a dummy Snowflake DAG like this SELECT 1; will find its own way to the Snowflake cloud service and will work:

import datetime
from datetime import timedelta

from airflow.models import DAG

# https://airflow.apache.org/docs/apache-airflow-providers-snowflake/stable/operators/snowflake.html
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

my_dag = DAG(
    "example_snowflake",
    start_date=datetime.datetime.utcnow(),
    default_args={"snowflake_conn_id": "my_snowflake_db"},
    schedule_interval="0 0 1 * *",
    tags=["example"],
    catchup=False,
    dagrun_timeout=timedelta(minutes=10),
)

sf_task_1 = SnowflakeOperator(
    task_id="sf_task_1",
    dag=my_dag,
    sql="SELECT 1;",
)

Upvotes: 0

Andrii Soldatenko
Andrii Soldatenko

Reputation: 567

Go to Admin -> Connections and update snowflake_default connection like this:

based on source code airflow/contrib/hooks/snowflake_hook.py:53 we need to add extras like this:

{
    "schema": "schema",
    "database": "database",
    "account": "account",
    "warehouse": "warehouse"
}

enter image description here

Upvotes: 1

Related Questions