Reputation: 185
I am new to Spark and Python and I have a sql which is stored in a variable in python and we use SnowFlake database. How to create a spark datafrom using SQL with snowflake connection?
import sf_connectivity (we have a code for establishing connection with Snowflake database)
emp = 'Select * From Employee'
snowflake_connection = sf_connectivity.collector() (It is a method to establish snowflake conenction)
requirement 1: Create Spark Dataframe (sf_df) using 'emp' and 'snowflake_connection '
requirement 2: sf_df.createOrReplaceTempView(Temp_Employee)
What are the packages or libraries it requires? How can I make this work?
Upvotes: 0
Views: 1484
Reputation: 923
With Public/Private key , you need to generate a cert https://community.snowflake.com/s/article/How-to-connect-snowflake-with-Spark-connector-using-Public-Private-Key and then you can use the below code .
from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.config("spark.jars", "<path/to/>/snowflake-jdbc-<version>.jar,<path/to/>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
.config("spark.repl.local.jars",
"<path/to/>/snowflake-jdbc-<version>.jar,<path/to/>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
.config("spark.sql.catalogImplementation", "in-memory") \
.getOrCreate()
spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(
spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
with open("<path/to/>/rsa_key.p8", "rb") as key_file:
p_key = serialization.load_pem_private_key(
key_file.read(),
password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
backend=default_backend()
)
pkb = p_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n", "", pkb).replace("\n", "")
sfOptions = {
"sfURL": "<URL>",
"sfAccount": "<ACCOUNTNAME>",
"sfUser": "<USER_NAME",
"pem_private_key": pkb,
# "sfPassword": "xxxxxxxxx",
"sfDatabase": "<DBNAME>",
"sfSchema": "<SCHEMA_NAME>",
"sfWarehouse": "<WH_NAME>",
"sfRole": "<ROLENAME>",
}
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", "<TABLENAME>") \
.load()
df.show()
Upvotes: 1
Reputation: 257
The documentation that helped me figure this out is here: https://docs.databricks.com/data/data-sources/snowflake.html
Took me awhile to figure out how to get it working though! After a lot of questions, I had my company's IT department configure a snowflake user account with private/public key authentication, and they configured that ID to be accessible within our corporate Databricks account.
After they set this up, the following code is an example how to pass a sql command as variable to Spark, and have Spark convert it into a dataframe.
optionsSource = dict(sfUrl="mycompany.east-us-2.azure.snowflakecomputing.com", # Snowflake Account Name
sfUser="my_service_acct",
pem_private_key=dbutils.secrets.get("my_scope", "my_secret"),
sfDatabase="mydatabase", # Snowflake Database
sfSchema="myschema", # Snowflake Schema
sfWarehouse="mywarehouse",
sfRole="myrole"
)
sqlcmd = '''
select current_date;
'''
df = spark.read.format("snowflake").options(**optionsSource).option("query", sqlcmd).load()
display(df)
Upvotes: 2