Techno04335
Techno04335

Reputation: 1445

Cannot Insert into SQL using PySpark, but works in SQL

I have created a table below in SQL using the following:

CREATE TABLE [dbo].[Validation](
    [RuleId] [int] IDENTITY(1,1) NOT NULL,
    [AppId] [varchar](255) NOT NULL,
    [Date] [date] NOT NULL,
    [RuleName] [varchar](255) NOT NULL,
    [Value] [nvarchar](4000) NOT NULL
)

NOTE the identity key (RuleId)

When inserting values into the table as below in SQL it works:

Note: Not inserting the Primary Key as is will autofill if table is empty and increment

INSERT INTO dbo.Validation VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')

However when creating a temp table on databricks and executing the same query below running this query on PySpark as below:

       %python

        driver = <Driver>
        url = "jdbc:sqlserver:<URL>"
        database = "<db>"
        table = "dbo.Validation"
        user = "<user>"
        password = "<pass>"

        #import the data
        remote_table = spark.read.format("jdbc")\
        .option("driver", driver)\
        .option("url", url)\
        .option("database", database)\
        .option("dbtable", table)\
        .option("user", user)\
        .option("password", password)\
        .load()

        remote_table.createOrReplaceTempView("YOUR_TEMP_VIEW_NAMES")

        sqlcontext.sql("INSERT INTO YOUR_TEMP_VIEW_NAMES VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')")

I get the error below:

AnalysisException: 'unknown requires that the data to be inserted have the same number of columns as the target table: target table has 5 column(s) but the inserted data has 4 column(s), including 0 partition column(s) having constant value(s).;'

Why does it work on SQL but not when passing the query through databricks? How can I insert through pyspark without getting this error?

Upvotes: 0

Views: 3761

Answers (1)

David Browne - Microsoft
David Browne - Microsoft

Reputation: 88851

The most straightforward solution here is use JDBC from a Scala cell. EG

%scala

import java.util.Properties
import java.sql.DriverManager

val jdbcUsername = dbutils.secrets.get(scope = "kv", key = "sqluser")
val jdbcPassword = dbutils.secrets.get(scope = "kv", key = "sqlpassword")
val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

// Create the JDBC URL without passing in the user and password parameters.
val jdbcUrl = s"jdbc:sqlserver://xxxx.database.windows.net:1433;database=AdventureWorks;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

// Create a Properties() object to hold the parameters.

val connectionProperties = new Properties()

connectionProperties.put("user", s"${jdbcUsername}")
connectionProperties.put("password", s"${jdbcPassword}")
connectionProperties.setProperty("Driver", driverClass)

val connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword)
val stmt = connection.createStatement()
val sql = "INSERT INTO dbo.Validation VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')"

stmt.execute(sql)
connection.close()

You could use pyodbc too, but the SQL Server ODBC drivers aren't installed by default, and the JDBC drivers are.

A Spark solution would be to create a view in SQL Server and insert against that. eg

create view Validation2 as
select AppId,Date,RuleName,Value
from Validation

then

tableName = "Validation2"
df = spark.read.jdbc(url=jdbcUrl, table=tableName, properties=connectionProperties)
df.createOrReplaceTempView(tableName)
sqlContext.sql("INSERT INTO Validation2 VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')")

If you want to encapsulate the Scala and call it from another language (like Python), you can use a scala package cell.

eg

%scala

package example

import java.util.Properties
import java.sql.DriverManager

object JDBCFacade 
{
  def runStatement(url : String, sql : String, userName : String, password: String): Unit = 
  {
    val connection = DriverManager.getConnection(url, userName, password)
    val stmt = connection.createStatement()
    try
    {
      stmt.execute(sql)  
    }
    finally
    {
      connection.close()  
    }
  }
}

and then you can call it like this:

jdbcUsername = dbutils.secrets.get(scope = "kv", key = "sqluser")
jdbcPassword = dbutils.secrets.get(scope = "kv", key = "sqlpassword")

jdbcUrl = "jdbc:sqlserver://xxxx.database.windows.net:1433;database=AdventureWorks;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

sql = "select 1 a into #foo from sys.objects"

sc._jvm.example.JDBCFacade.runStatement(jdbcUrl,sql, jdbcUsername, jdbcPassword)

Upvotes: 1

Related Questions