Reputation: 599
Good day, I cannot find how to do basic setup to airflow.contrib.operators.snowflake_operator.SnowflakeOperator
to connect to snowflake. snowflake.connector.connect
works fine.
When I do it with SnowflakeOperator
:
op = snowflake_operator.SnowflakeOperator(sql = "create table test(*****)", task_id = '123')
I get the
airflow.exceptions.AirflowException: The conn_id
snowflake_defaultisn't defined
I tried to insert in backend sqlite db
INSERT INTO connection(
conn_id, conn_type, host
, schema, login, password
, port, is_encrypted, is_extra_encrypted
) VALUES (*****)
But after it I get an error:
snowflake.connector.errors.ProgrammingError: 251001: None: Account must be specified
.
Passing account
kwarg into SnowflakeOperator
constructor does not help. It seems I cannot pass account into db or into constructor, but it's required.
Please help me, let me know what data I should insert into backend local db to be able to connect via SnowflakeOperator
Upvotes: 2
Views: 3999
Reputation: 4891
With this context:
$ airflow version
2.2.3
$ pip install snowflake-connector-python==2.4.1
$ pip install apache-airflow-providers-snowflake==2.5.0
You have to specify the Snowflake Account and Snowflake Region twice like this:
airflow connections add 'my_snowflake_db' \
--conn-type 'snowflake' \
--conn-login 'my_user' \
--conn-password 'my_password' \
--conn-port 443 \
--conn-schema 'public' \
--conn-host 'my_account_xyz.my_region_abc.snowflakecomputing.com' \
--conn-extra '{ "account": "my_account_xyz", "warehouse": "my_warehouse", "region": "my_region_abc" }'
Otherwise it doesn't work throwing the Python exception:
snowflake.connector.errors.ProgrammingError: 251001: 251001: Account must be specified
I think this might be due to that airflow
command parameter --conn-host
that is expecting a full domain with subdomain (the my_account_xyz.my_region_abc
), that usually for Snowflake are specified as query parameters in a way similar to this template (although I did not check all the combinations of the command airflow connections add
and the DAG execution):
"snowflake://{user}:{password}@{account}{region}{cloud}/{database}/{schema}?role={role}&warehouse={warehouse}&timezone={timezone}"
Then a dummy Snowflake DAG like this SELECT 1;
will find its own way to the Snowflake cloud service and will work:
import datetime
from datetime import timedelta
from airflow.models import DAG
# https://airflow.apache.org/docs/apache-airflow-providers-snowflake/stable/operators/snowflake.html
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
my_dag = DAG(
"example_snowflake",
start_date=datetime.datetime.utcnow(),
default_args={"snowflake_conn_id": "my_snowflake_db"},
schedule_interval="0 0 1 * *",
tags=["example"],
catchup=False,
dagrun_timeout=timedelta(minutes=10),
)
sf_task_1 = SnowflakeOperator(
task_id="sf_task_1",
dag=my_dag,
sql="SELECT 1;",
)
Upvotes: 0
Reputation: 567
Go to Admin -> Connections and update snowflake_default
connection like this:
based on source code airflow/contrib/hooks/snowflake_hook.py:53
we need to add extras like this:
{
"schema": "schema",
"database": "database",
"account": "account",
"warehouse": "warehouse"
}
Upvotes: 1