Reputation: 300
Quickly, my need: create a Spark dataframe from a more or less complex query in T-SQL (SQL Server) and/or from the output of a SQL Server stored procedure.
As far I understand, Spark does not allow to execute queries in the dialect of the underlying data source. Yes, there is a way to obtain low level object and perform stored procedures but in this manner I don't have the Spark DF in output.
So, I thought to perform a query in the classical pyodbc way, obtain the results and then build the Spark dataframe with the function SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True) providing the data and the schema. I can obtain the data, but I can't build the schema (a list of pairs (column name, data type)) from the output cursor. Follows a working example to (generate and) extract sample data from a local instance of SQL Server:
import pyodbc
connection_string = "Driver={SQL Server};Server=LOCALHOST;Database=master;Trusted_Connection=yes;"
db_connection = pyodbc.connect(connection_string)
sql_query = """
SET NOCOUNT ON
DECLARE @TBL_TEST AS TABLE (
column_1 INT NOT NULL PRIMARY KEY CLUSTERED IDENTITY(1, 1),
column_2 VARCHAR(10) NOT NULL,
column_3 VARCHAR(20) NULL,
column_4 INT NOT NULL
)
INSERT INTO @TBL_TEST (column_2, column_3, column_4)
VALUES
('test1_col2', 'test1_col3', 100),
('test2_col2', 'test2_col3', 200),
('test3_col2', NULL, 300)
SET NOCOUNT OFF
SELECT t.* FROM @TBL_TEST AS t
"""
cursor = db_connection.cursor()
rows = cursor.execute(sql_query).fetchall()
cursor.close()
db_connection.close()
print(rows)
How can I extract the schema from the returned cursor and obtain a schema object to give to the createDataFrame() function?
Remember that my goal is that on the topic, so other ways are also welcome!
Thank you in advance!
Upvotes: 0
Views: 4621
Reputation: 14925
I hope this fits your use case. Again, this will not scale since it runs on the control node (executor). If you have a 5 node cluster, this will run on 1 node only.
With that said, we can let spark infer the data types. If you look at pyodbc documentation, you have to install a local ODBC driver. This is good for anaconda on a workstation but bad for a spark cluster. Instead, user pymssql module which is self contained native code. Install using the PyPi section of the cluster libraries.
Now that we have a driver, lets write a module that will return a data frame from either a SELECT statement or stored procedure call, EXEC.
#
# Create function to call tsql + return df
#
# use module
import pymssql
# define function
def call_tsql(info):
# make connection
conn = pymssql.connect(server=info[0], user=info[1], password=info[2], database=info[3])
# open cursor
cursor = conn.cursor()
cursor.execute(info[4])
# grab column data (name, type, ...)
desc = cursor.description
# grab data as list of tuples
dat1 = cursor.fetchall()
# close cursor
conn.commit()
conn.close()
# extract column names
col1 = list(map(lambda x: x[0], desc))
# let spark infer data types
df1 = spark.createDataFrame(data=dat1, schema=col1)
# return dataframe
return df1
This code will only support one result set. Modify if you need MARS, multiple active result sets.
There are tons of comments. In short, a tuple of information for the connection and the TSQL is passed in as parameters and the dataframe is returned.
Yes, there are data types in cursor.description but they are encoded. I did not find a good mapping. Since you are not dealing with large data, infer the schema. Otherwise, pass a DDL statement for the schema instead of column headers.
#
# Make call using SELECT statement
#
# tuple of info (server, user, pwd, database, query)
info = ('svr4tips2030.database.windows.net', '<your user>', '<your pwd>', 'dbs4advwrks', 'select * from dbo.vDMPrep')
# get data frame
df2 = call_tsql(info)
The above images shows the inferred types and the below image shows the data.
I created a very simple stored procedure which just selects from the view.
CREATE PROCEDURE [dbo].[StackOverFlowTest]
AS
BEGIN
SELECT * FROM [dbo].[vDMPrep]
END
GO
If we change the call to execute this stored procedure, we get the same answer.
#
# Make call using EXEC statement
#
# tuple of info (server, user, pwd, database, query)
info = ('svr4tips2030.database.windows.net', '<your user>', '<your pwd>', 'dbs4advwrks', 'exec [dbo].[StackOverFlowTest]')
# get data frame
df2 = call_tsql(info)
Enclosed is a sample SPARK DDL statement. See the file_schema part of the JSON document that is passed to a notebook as a parameter.
#
# Table 1 - dim.account
#
# Set parameters for notebook
parms = {
"datalake_path": "/mnt/datalake/bronze/",
"datafile_path": "/dim/account/dim-account-20200905T101530.csv",
"debug_flag": "false",
"partition_count": "2",
"file_schema": "AccountKey INT, ParentAccountKey INT, AccountCodeAlternateKey INT, ParentAccountCodeAlternateKey INT, AccountDescription STRING, AccountType STRING, Operator STRING, CustomMembers STRING, ValueType STRING, CustomMemberOptions STRING"
}
# Run notebook with selections
ret = dbutils.notebook.run("./nb-full-load-delta-table", 60, parms)
# Show return value if any
print(ret)
To recap, if you have performance issues, create a single node cluster and scale the compute on that one node. No since to have multiple nodes since they will not be used by the pymssql module.
Upvotes: 0
Reputation: 14925
This is part two of the answer. There is no good way to return results from a Stored Procedure call as a dataframe.
Here is a link on the MSFT github site for this driver stating that stored procedures are not supported.
https://github.com/microsoft/sql-spark-connector/issues/21
This is a hack - work around.
In my case, my SP is going to do some work and save it to a staging table. The use the above technique to read the table.
The code below deletes the table if it exists and then reloads.
--
-- Sample Call
--
CREATE PROCEDURE dbo.StackOverFlowTest
AS
BEGIN
DROP TABLE IF EXISTS stage.DimSalesTerritory;
SELECT * INTO stage.DimSalesTerritory FROM dbo.DimSalesTerritory
END
Here is the code to get to the low level JAVA driver manager. It has a property to call a SP.
#
# Grab the low level driver manager, exec sp
#
driver_manager = spark._sc._gateway.jvm.java.sql.DriverManager
connection = driver_manager.getConnection(url, user_name, password)
connection.prepareCall("EXEC dbo.StackOverFlowTest").execute()
connection.close()
Using spark.read() to retrieve data from the new table filled by SP.
Upvotes: 0
Reputation: 14925
If you use pyodbc, the resulting java byte code generated by the catalyst optimizer runs as just one node (executor), not the whole cluster. For larger data sets, this prevents the full use of the cluster and performance issues.
It is better to use a spark driver for JDBC. Microsoft has one.
https://learn.microsoft.com/en-us/sql/connect/spark/connector?view=sql-server-ver16
Create the complex T-SQL as views and just read them. That is what spark was made for - reading files. Using the JDBC driver (spark), it will allow you to read in parallel if needed by changing the partition method.
Install the Marven library for the correct version of spark.
I am using Spark Version > 3.1.
I have the adventure works database with a view called v.
#
# Set connection properties
#
server_name = "jdbc:sqlserver://svr4tips2030.database.windows.net"
database_name = "dbs4advwrks"
url = server_name + ";" + "databaseName=" + database_name + ";"
table_name = "dbo.vDMPrep"
user_name = "enter your user here"
password = "enter your password here"
Make a typical spark.read() call with the JDBC driver.
df = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", user_name) \
.option("password", password).load()
display(df)
Here is the results of displaying the dataframe.
Is the data frame strictly typed? The Answer is yes since it gets the field information from SQL Server.
Last but not least, is the view complex? The image below shows 8 tables are joined and aggregated to get the final result for the view.
In summary, use views in the database to pre-compile your data sets for Spark. Use the JDBC driver from Microsoft to read and write from SQL Server using dataframe.
As for the stored procedure, there is a way to used the driver to execute non queries. I will have to look for the code. Stay tuned for an update or part 2.
Upvotes: 1