Armando Contestabile
Armando Contestabile

Reputation: 300

Create a (Py)Spark dataframe from a SQL query in target dialect

Quickly, my need: create a Spark dataframe from a more or less complex query in T-SQL (SQL Server) and/or from the output of a SQL Server stored procedure.

As far I understand, Spark does not allow to execute queries in the dialect of the underlying data source. Yes, there is a way to obtain low level object and perform stored procedures but in this manner I don't have the Spark DF in output.

So, I thought to perform a query in the classical pyodbc way, obtain the results and then build the Spark dataframe with the function SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True) providing the data and the schema. I can obtain the data, but I can't build the schema (a list of pairs (column name, data type)) from the output cursor. Follows a working example to (generate and) extract sample data from a local instance of SQL Server:

import pyodbc

connection_string = "Driver={SQL Server};Server=LOCALHOST;Database=master;Trusted_Connection=yes;"
db_connection = pyodbc.connect(connection_string)

sql_query = """
SET NOCOUNT ON
DECLARE @TBL_TEST AS TABLE (
    column_1 INT NOT NULL PRIMARY KEY CLUSTERED IDENTITY(1, 1),
    column_2 VARCHAR(10) NOT NULL,
    column_3 VARCHAR(20) NULL,
    column_4 INT NOT NULL
)

INSERT INTO @TBL_TEST (column_2, column_3, column_4)
VALUES
('test1_col2', 'test1_col3', 100),
('test2_col2', 'test2_col3', 200),
('test3_col2', NULL, 300)

SET NOCOUNT OFF
SELECT t.* FROM @TBL_TEST AS t
"""

cursor = db_connection.cursor()
rows = cursor.execute(sql_query).fetchall()
cursor.close()
db_connection.close()

print(rows)

How can I extract the schema from the returned cursor and obtain a schema object to give to the createDataFrame() function?

Remember that my goal is that on the topic, so other ways are also welcome!

Thank you in advance!

Upvotes: 0

Views: 4621

Answers (3)

CRAFTY DBA
CRAFTY DBA

Reputation: 14925

I hope this fits your use case. Again, this will not scale since it runs on the control node (executor). If you have a 5 node cluster, this will run on 1 node only.

With that said, we can let spark infer the data types. If you look at pyodbc documentation, you have to install a local ODBC driver. This is good for anaconda on a workstation but bad for a spark cluster. Instead, user pymssql module which is self contained native code. Install using the PyPi section of the cluster libraries.

https://learn.microsoft.com/en-us/sql/connect/python/pymssql/python-sql-driver-pymssql?view=sql-server-ver16

Now that we have a driver, lets write a module that will return a data frame from either a SELECT statement or stored procedure call, EXEC.

#
#  Create function to call tsql + return df
#

# use module
import pymssql  

# define function
def call_tsql(info):
  
  # make connection
  conn = pymssql.connect(server=info[0], user=info[1], password=info[2], database=info[3])  
  
  # open cursor
  cursor = conn.cursor()  
  cursor.execute(info[4])
  
  # grab column data (name, type, ...)
  desc = cursor.description
  
  # grab data as list of tuples
  dat1 = cursor.fetchall()
  
  # close cursor
  conn.commit()
  conn.close()
  
  # extract column names
  col1 = list(map(lambda x: x[0], desc))
  
  # let spark infer data types
  df1 = spark.createDataFrame(data=dat1, schema=col1)
  
  # return dataframe
  return df1
  

This code will only support one result set. Modify if you need MARS, multiple active result sets.

There are tons of comments. In short, a tuple of information for the connection and the TSQL is passed in as parameters and the dataframe is returned.

Yes, there are data types in cursor.description but they are encoded. I did not find a good mapping. Since you are not dealing with large data, infer the schema. Otherwise, pass a DDL statement for the schema instead of column headers.

#
# Make call using SELECT statement
#

# tuple of info (server, user, pwd, database, query)
info = ('svr4tips2030.database.windows.net', '<your user>', '<your pwd>', 'dbs4advwrks', 'select * from dbo.vDMPrep')

# get data frame
df2 = call_tsql(info)

enter image description here

The above images shows the inferred types and the below image shows the data.

enter image description here

I created a very simple stored procedure which just selects from the view.

CREATE PROCEDURE [dbo].[StackOverFlowTest]
AS
BEGIN
    SELECT * FROM [dbo].[vDMPrep]
END
GO

If we change the call to execute this stored procedure, we get the same answer.

#
# Make call using EXEC statement
#

# tuple of info (server, user, pwd, database, query)
info = ('svr4tips2030.database.windows.net', '<your user>', '<your pwd>', 'dbs4advwrks', 'exec [dbo].[StackOverFlowTest]')

# get data frame
df2 = call_tsql(info)

Enclosed is a sample SPARK DDL statement. See the file_schema part of the JSON document that is passed to a notebook as a parameter.

#
# Table 1 - dim.account
#

# Set parameters for notebook
parms = {
"datalake_path": "/mnt/datalake/bronze/",
"datafile_path": "/dim/account/dim-account-20200905T101530.csv",
"debug_flag": "false",
"partition_count": "2",
"file_schema": "AccountKey INT, ParentAccountKey INT, AccountCodeAlternateKey INT, ParentAccountCodeAlternateKey INT, AccountDescription STRING, AccountType STRING, Operator STRING, CustomMembers STRING, ValueType STRING, CustomMemberOptions STRING"
}

# Run notebook with selections
ret = dbutils.notebook.run("./nb-full-load-delta-table", 60, parms)

# Show return value if any
print(ret)

To recap, if you have performance issues, create a single node cluster and scale the compute on that one node. No since to have multiple nodes since they will not be used by the pymssql module.

Upvotes: 0

CRAFTY DBA
CRAFTY DBA

Reputation: 14925

This is part two of the answer. There is no good way to return results from a Stored Procedure call as a dataframe.

Here is a link on the MSFT github site for this driver stating that stored procedures are not supported.

https://github.com/microsoft/sql-spark-connector/issues/21

This is a hack - work around.

In my case, my SP is going to do some work and save it to a staging table. The use the above technique to read the table.

The code below deletes the table if it exists and then reloads.

-- 
-- Sample Call
-- 
CREATE PROCEDURE dbo.StackOverFlowTest
AS
BEGIN
    DROP TABLE IF EXISTS stage.DimSalesTerritory;
    SELECT * INTO stage.DimSalesTerritory FROM dbo.DimSalesTerritory
END

Here is the code to get to the low level JAVA driver manager. It has a property to call a SP.

#
#  Grab the low level driver manager, exec sp
#

driver_manager = spark._sc._gateway.jvm.java.sql.DriverManager
connection = driver_manager.getConnection(url, user_name, password)
connection.prepareCall("EXEC dbo.StackOverFlowTest").execute()
connection.close()

Using spark.read() to retrieve data from the new table filled by SP.

enter image description here

Upvotes: 0

CRAFTY DBA
CRAFTY DBA

Reputation: 14925

If you use pyodbc, the resulting java byte code generated by the catalyst optimizer runs as just one node (executor), not the whole cluster. For larger data sets, this prevents the full use of the cluster and performance issues.

enter image description here

It is better to use a spark driver for JDBC. Microsoft has one.

https://learn.microsoft.com/en-us/sql/connect/spark/connector?view=sql-server-ver16

Create the complex T-SQL as views and just read them. That is what spark was made for - reading files. Using the JDBC driver (spark), it will allow you to read in parallel if needed by changing the partition method.

Install the Marven library for the correct version of spark.

enter image description here

I am using Spark Version > 3.1.

I have the adventure works database with a view called v.

enter image description here

#
#  Set connection properties
#

server_name = "jdbc:sqlserver://svr4tips2030.database.windows.net"
database_name = "dbs4advwrks"
url = server_name + ";" + "databaseName=" + database_name + ";"
table_name = "dbo.vDMPrep"
user_name = "enter your user here"
password = "enter your password here"

Make a typical spark.read() call with the JDBC driver.

df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", user_name) \
        .option("password", password).load()

display(df)

Here is the results of displaying the dataframe.

enter image description here

Is the data frame strictly typed? The Answer is yes since it gets the field information from SQL Server.

enter image description here

Last but not least, is the view complex? The image below shows 8 tables are joined and aggregated to get the final result for the view.

enter image description here

In summary, use views in the database to pre-compile your data sets for Spark. Use the JDBC driver from Microsoft to read and write from SQL Server using dataframe.

As for the stored procedure, there is a way to used the driver to execute non queries. I will have to look for the code. Stay tuned for an update or part 2.

Upvotes: 1

Related Questions