K.Tom
K.Tom

Reputation: 185

How to create a Spark data frame using snow flake connection in python?

I am new to Spark and Python and I have a sql which is stored in a variable in python and we use SnowFlake database. How to create a spark datafrom using SQL with snowflake connection?

    import sf_connectivity (we have a code for establishing connection with Snowflake database)
    emp = 'Select * From Employee'
    snowflake_connection = sf_connectivity.collector() (It is a method to establish snowflake conenction)
    requirement 1: Create Spark Dataframe (sf_df) using 'emp' and 'snowflake_connection '
    requirement 2: sf_df.createOrReplaceTempView(Temp_Employee)

What are the packages or libraries it requires? How can I make this work?

Upvotes: 0

Views: 1484

Answers (2)

Ankur Srivastava
Ankur Srivastava

Reputation: 923

With Public/Private key , you need to generate a cert https://community.snowflake.com/s/article/How-to-connect-snowflake-with-Spark-connector-using-Public-Private-Key and then you can use the below code .

from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import re
import os
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .config("spark.jars", "<path/to/>/snowflake-jdbc-<version>.jar,<path/to/>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
    .config("spark.repl.local.jars",
            "<path/to/>/snowflake-jdbc-<version>.jar,<path/to/>/spark-snowflake_2.11-2.4.13-spark_2.4.jar") \
    .config("spark.sql.catalogImplementation", "in-memory") \
    .getOrCreate()

spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(
    spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())

with open("<path/to/>/rsa_key.p8", "rb") as key_file:
    p_key = serialization.load_pem_private_key(
        key_file.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption()
)
pkb = pkb.decode("UTF-8")
pkb = re.sub("-*(BEGIN|END) PRIVATE KEY-*\n", "", pkb).replace("\n", "")


sfOptions = {
    "sfURL": "<URL>",
    "sfAccount": "<ACCOUNTNAME>",
    "sfUser": "<USER_NAME",
    "pem_private_key": pkb,
    # "sfPassword": "xxxxxxxxx",
    "sfDatabase": "<DBNAME>",
    "sfSchema": "<SCHEMA_NAME>",
    "sfWarehouse": "<WH_NAME>",
    "sfRole": "<ROLENAME>",
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", "<TABLENAME>") \
    .load()

df.show()

Upvotes: 1

Nathan T Alexander
Nathan T Alexander

Reputation: 257

The documentation that helped me figure this out is here: https://docs.databricks.com/data/data-sources/snowflake.html

Took me awhile to figure out how to get it working though! After a lot of questions, I had my company's IT department configure a snowflake user account with private/public key authentication, and they configured that ID to be accessible within our corporate Databricks account.

After they set this up, the following code is an example how to pass a sql command as variable to Spark, and have Spark convert it into a dataframe.

optionsSource = dict(sfUrl="mycompany.east-us-2.azure.snowflakecomputing.com", # Snowflake Account Name
                          sfUser="my_service_acct",
                          pem_private_key=dbutils.secrets.get("my_scope", "my_secret"),
                   sfDatabase="mydatabase", # Snowflake Database
                   sfSchema="myschema", # Snowflake Schema
                   sfWarehouse="mywarehouse",
                   sfRole="myrole"
                        )   

    sqlcmd = '''
    select current_date;
    '''

    df = spark.read.format("snowflake").options(**optionsSource).option("query", sqlcmd).load()
    display(df)

Upvotes: 2

Related Questions