Ajay
Ajay

Reputation: 267

How to execute a stored procedure in Azure Databricks PySpark?

I am able to execute a simple SQL statement using PySpark in Azure Databricks but I want to execute a stored procedure instead. Below is the PySpark code I tried.

#initialize pyspark
import findspark
findspark.init('C:\Spark\spark-2.4.5-bin-hadoop2.7')
#import required modules
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import *
import pandas as pd

#Create spark configuration object
conf = SparkConf()
conf.setMaster("local").setAppName("My app")
#Create spark context and sparksession
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

table = "dbo.test"
#read table data into a spark dataframe
jdbcDF = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://localhost:1433;databaseName=Demo;integratedSecurity=true;") \
    .option("dbtable", table) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

#show the data loaded into dataframe
#jdbcDF.show()
sqlQueries="execute testJoin"
resultDF=spark.sql(sqlQueries)
resultDF.show(resultDF.count(),False)

This doesn't work — how do I do it?

Upvotes: 9

Views: 40402

Answers (6)

Sanket Lothe
Sanket Lothe

Reputation: 1

def get_sql_properties_and_uri() -> Tuple[Dict[str, str], str]:
        # JDBC URL for Azure SQL Database
        jdbcUrl = f"jdbc:sqlserver://{self.jdbcHostname}:{self.jdbcPort};database={self.jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
    
        connectionProperties = {
            "user": self.jdbcUsername,
            "password": self.jdbcPassword,
            "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        }

        return connectionProperties, jdbcUrl


def execute_stored_procedure(connection_properties:str,jdbc_url:str):

        # Insert today's date (if no row exists for today)
        query_to_execute = f"""
        EXEC get_modified_device_averages;
        """
       
        try:
            
            # Use JDBC connection to execute SQL commands
            connection = jaydebeapi.connect(
                connection_properties['driver'],
                jdbc_url,
                [connection_properties['user'], connection_properties['password']]
            )
            cursor = connection.cursor()
            cursor.execute(query_to_execute)
            connection.commit()
            cursor.close()
            connection.close()
        except Exception as e:
            print(f"Error executing SQL queries: {e}")

connection_properties, jdbc_url = get_sql_properties_and_uri()

// Call execute_stored_procedure   

Upvotes: 0

Sanket Lothe
Sanket Lothe

Reputation: 1

# Install jaydebeapi
!pip install jaydebeapi

import jaydebeapi

# Write your own keys below
jdbcHostname = ''  # e.g., 'your_server.database.windows.net'
jdbcPort = 1433
jdbcDatabase = ''  # e.g., 'your_database'
jdbcUsername = ''  # e.g., 'your_username'
jdbcPassword = ''  # e.g., 'your_password'

# Get JDBC URL for SQL Database
jdbcUrl = f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};database={jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

connectionProperties = {
    "user": jdbcUsername,
    "password": jdbcPassword,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

def execute_stored_procedure(connection_properties: dict, jdbc_url: str):
    # Insert today's date (if no row exists for today)
    query_to_execute = """
    EXEC your_stored_procedure;
    """

    try:
        # Use JDBC connection to execute SQL commands
        connection = jaydebeapi.connect(
            connection_properties['driver'],
            jdbc_url,
            [connection_properties['user'], connection_properties['password']]
        )
        cursor = connection.cursor()
        cursor.execute(query_to_execute)
        connection.commit()
        cursor.close()
        connection.close()
    except Exception as e:
        print(f"Error executing SQL queries: {e}")

# Call execute_stored_procedure function
execute_stored_procedure(connectionProperties, jdbcUrl)

Upvotes: 0

S. Bills
S. Bills

Reputation: 1

Here is simple way to execute a procedure on SQL Server from an Azure Databricks Notebook using python:

%pip install pymssql

import pymssql 

with pymssql.connect(server=f"{sqlServer}.database.windows.net", user=dbUser, password=dbPword, database=sqlDb) as conn:
    with conn.cursor() as cursor:
        cursor.callproc(<the name of your proc>) 
        conn.commit()

Upvotes: 0

jonathan-dufault-kr
jonathan-dufault-kr

Reputation: 698

I believe the top answer shows how to execute a command/stored procedure but not how to get the result if the stored procedure returns a table. This is the first result on Google for me, so hope this helps someone.

Solution

The reason executing the stored procedure fails in the first place is that spark parenthesizes the query and assigns it to an alias (select * from (query)). More details on that here https://stackoverflow.com/a/75330385/19871699

I have two different sets of code one pyspark, one scala. Both do the same thing using the jdbc driver. I like the scala better. Right now it requires the dataset to fit in memory but you can do stuff with batching to work around it.

A lot of the documentation on how to use the driver is here: https://learn.microsoft.com/en-us/sql/connect/jdbc/microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16

Both sets of code:

  1. create a connection object to the server
  2. make a sql query statement and execute it
  3. fetch a metadata and result set
  4. extract information on the columns/count
  5. loop through every row of the result set and every field within it to extract the values, populating a row list for the fields, and then appending it to the rows list.
  6. Convert the resulting list of lists into a dataframe

Python

There's an issue with dates staying a JavaObject type although pandas reads it just fine. If you did spark.createDataFrame(pd.DataFrame(...)) it'd complain about the type, but an intermediary csv works.

I use the getObject because it extracts whatever the result is. There are other methods like getString,getFloat,... that would be better

This is the worst possible scenario though for py4j. You are serializing every single value in the entire table one by one to send from scala over to python.

%python
import pandas as pd

# parameters
username = ""
password = ""
host = ""
port = ""
database = ""

query = """
exec ...
"""

# construct jdbc url
sqlsUrl = f"jdbc:sqlserver://{host}:{port};database={database}"


# get the gateway/connection to py4j
gateway = sc._gateway
jvm = gateway.jvm

# connection to the server
con = jvm.java.sql.DriverManager.getConnection(sqlsUrl, username,password)


# create a statement, execute it, get result set and metadata
statement = con.prepareCall(query)
statement.execute()

metadata = statement.getMetaData()
resultset = statement.getResultSet()

# extract column names from metadata
columns = [metadata.getColumnName(i+1) for i in range(metadata.getColumnCount())]


# loop through the result set and make into a list of lists
rows = []
while resultset.next():
    row = []
    for i in range(len(columns)):
        row.append(resultset.getObject(i+1))
    rows.append(row)

# close the connection
con.close()

# make into a pandas dataframe and write temporarily to a csv (bug with date)
pd.DataFrame(rows, columns=columns).to_csv("tmp.csv",index=False)

df = spark.createDataFrame(pd.read_csv("tmp.csv"))

# voila
display(df)

Scala

%scala
import org.apache.spark.sql.types._
import java.sql.DriverManager
import java.sql.ResultSet

// connection parameters
val username = ""
val password = ""
val host = ""
val port = ""
val database = ""

val sqlsUrl = s"jdbc:sqlserver://$host:$port;databaseName=$database"

// query
val query = """
exec ..
"""

// get connection
val connection = DriverManager.getConnection(sqlsUrl, username,password)

// prepare statement and execute it
val statement = connection.prepareCall(query)
statement.executeQuery()


// fetch results and the structure of the results
val metaData = statement.getMetaData()
val resultSet = statement.getResultSet()
val indices = (1 to metaData.getColumnCount).toList

// translation of java types to spark types
val columnTypesSpark = Map(
    "java.lang.String"-> StringType,
    "java.lang.Short"-> ShortType,
    "java.sql.Date"-> DateType,
    "java.sql.Timestamp"-> TimestampType,
    "java.math.BigDecimal"-> DecimalType(10,1), // whatever precision you want
    "java.lang.Float" -> FloatType,
    "java.lang.Integer" -> IntegerType,
    "java.lang.Boolean" -> BooleanType)


// list out the column types in the returned data
val columnTypes = indices.map(i => columnTypesSpark(metaData.getColumnClassName(i)) )

// list out the column names in the returned data
val columnNames = indices.map(i => metaData.getColumnLabel(i))

// define the schema
val schema = StructType(indices.map(i => StructField(columnNames(i-1),columnTypes(i-1)) ))

// loop through the results dataset
val results: List[Row] = Iterator
  .continually {
    if (resultSet.next()) Some(Row(indices.map(o => resultSet.getObject(o)).toList:_*))
    else None
  }
  .takeWhile(_.isDefined)
  .map(_.get)
  .toList

// close connection
con.close()

// convert results rowset into an RDD and then assign results into a dataframe
val df = spark.createDataFrame(sc.parallelize(results),schema)

display(df)

Long answer. Hope that helps someone.

Upvotes: 1

Dr. Casual
Dr. Casual

Reputation: 438

In case someone is still looking for a method on how to do this, it's possible to use the built-in jdbc-connector of you spark session. Following code sample will do the trick:

import msal

# Set url & credentials
jdbc_url = ...
tenant_id = ...
sp_client_id = ...
sp_client_secret = ...

# Write your SQL statement as a string
name = "Some passed value"

statement = f"""
EXEC Staging.SPR_InsertDummy
  @Name = '{name}'
"""

# Generate an OAuth2 access token for service principal
authority = f"https://login.windows.net/{tenant_id}"
app = msal.ConfidentialClientApplication(sp_client_id, sp_client_secret, authority)
token = app.acquire_token_for_client(scopes="https://database.windows.net/.default")["access_token"]

# Create a spark properties object and pass the access token
properties = spark._sc._gateway.jvm.java.util.Properties()
properties.setProperty("accessToken", token)

# Fetch the driver manager from your spark context
driver_manager = spark._sc._gateway.jvm.java.sql.DriverManager

# Create a connection object and pass the properties object
con = driver_manager.getConnection(jdbc_url, properties)

# Create callable statement and execute it
exec_statement = con.prepareCall(statement)
exec_statement.execute()

# Close connections
exec_statement.close()
con.close()

For more information and a similar method using SQL-user credentials to connect over JDBC, or on how to take return parameters, I'd suggest you take a look at this blogpost:

https://medium.com/delaware-pro/executing-ddl-statements-stored-procedures-on-sql-server-using-pyspark-in-databricks-2b31d9276811

Upvotes: 7

BICube
BICube

Reputation: 4681

Running a stored procedure through a JDBC connection from azure databricks is not supported as of now. But your options are:

  1. Use a pyodbc library to connect and execute your procedure. But by using this library, it means that you will be running your code on the driver node while all your workers are idle. See this article for details. https://datathirst.net/blog/2018/10/12/executing-sql-server-stored-procedures-on-databricks-pyspark

  2. Use a SQL table function rather than procedures. In a sense, you can use anything that you can use in the FORM clause of a SQL query.

  3. Since you are in an azure environment, then using a combination of azure data factory (to execute your procedure) and azure databricks can help you to build pretty powerful pipelines.

Upvotes: 5

Related Questions