nidsk
nidsk

Reputation: 11

How do I use the Spark connector in DataBricks to do a bulk insert into SQL?

I have a dataframe in DataBricks which I am trying to bulk insert into SQL Server. I have followed this tutorial on Microsoft's website, specifically using this code:


# df is created as a Dataframe, with 1000 rows of sample data

server_name = "jdbc:sqlserver://x.database.windows.net"
database_name = "dbTest"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "dbo.Bulk"
username = "user123"
password = "Password123"

df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()

However this procedures the following error

java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.schemaString(Lorg/apache/spark/sql/Dataset;Ljava/lang/String;Lscala/Option;)Ljava/lang/String;

A more details error log:

Py4JJavaError                             Traceback (most recent call last)
<command-2622503877398381> in <module>
      7 password = "********"
      8 
----> 9 df_countries.write \
     10             .format("com.microsoft.sqlserver.jdbc.spark") \
     11             .mode("overwrite") \

/databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
   1132             self.format(format)
   1133         if path is None:
-> 1134             self._jwrite.save()
   1135         else:
   1136             self._jwrite.save(path)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    108     def deco(*a, **kw):
    109         try:
--> 110             return f(*a, **kw)
    111         except py4j.protocol.Py4JJavaError as e:
    112             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o1515.save.
: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.schemaString(Lorg/apache/spark/sql/Dataset;Ljava/lang/String;Lscala/Option;)Ljava/lang/String;
    at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.mssqlCreateTable(BulkCopyUtils.scala:506)
    at com.microsoft.sqlserver.jdbc.spark.SingleInstanceConnector$.createTable(SingleInstanceConnector.scala:46)
    at com.microsoft.sqlserver.jdbc.spark.Connector.write(Connector.scala:90)
    at com.microsoft.sqlserver.jdbc.spark.DefaultSource.createRelation(DefaultSource.scala:64)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:71)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:94)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:196)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:240)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:236)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:163)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:162)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:1079)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:126)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:217)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:1079)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:468)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:311)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)

What am I doing wrong?

Upvotes: 1

Views: 7277

Answers (3)

Han Gao
Han Gao

Reputation: 11

I had the exact same error because I was running Databricks 10.5 (scala 2.12, spark 3.2.1). Switch the Databricks version to 9.1 LTS (Spark 3.1.2, Scala 2.12) and the same code should go through

To speed up your bulk insert set tableLock option to true in your bulk insert code, the sql spark connector git project has benchmarks for different options.

df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    **.option("tableLock", "true") \**
    .save()

Upvotes: 0

Razvan Ipate
Razvan Ipate

Reputation: 21

I'm having the same issue.

The problem started when we wanted to use the newest cluster runtime (8.3 at the time of writing this). Going back to runtime 7 makes it work fine.

According to the Databricks' migration guide to 8: "Databricks Runtime 8.0 changes the default format to delta to make it simpler to create a Delta table." - as opposed to parquet.

https://docs.databricks.com/release-notes/runtime/8.0-migration.html

I thought since the error happens only when going with runtime 8 this might be causing it but I tried implementing Option 1 (adding the 2 configurations to the cluster) and the error is still happening.

I guess for now we're going to leave it to runtime 7 but at some time we'll need to upgrade.

Upvotes: 2

ASH
ASH

Reputation: 20302

I haven't used Databricks in at least 18 months, but this worked for me the last time I was using that technology.

from  pyspark.sql.functions import input_file_name
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

customSchema = StructType([ \
StructField("asset_id", StringType(), True), \
StructField("price_date", StringType(), True), \
etc., 
StructField("close_price", StringType(), True), \
StructField("filename", StringType(), True)])

fullpath = 'path_to files_and_all_credentials'

df = spark.read.format("csv") \
   .option("header", "false") \
   .option("sep","|") \
   .schema(customSchema) \
   .load(fullPath) \
   .withColumn("filename", input_file_name())

# And, to write to DB.

import pandas as pd
url = "jdbc:sqlserver://server_name.database.windows.net:1433;databaseName=db_name"
props = {"user": "usr","password": "pwd", "batchsize" : "500000"}

pd.set_option('display.max_columns', None)
#df.printSchema()
#df.show()

df.write.mode('append').jdbc(url,"dbo.table_name",properties=props)

As an aside, you could try this.

spark_jdbcDF.write
    .format("jdbc")
    .option("url", "jdbc:sqlserver://yourserver.database.windows.net:1433")
    .option("dbtable", "<your table name>")
    .option("user", "username")
    .option("password", "password")
    .save()


# Saving data to a JDBC source
df.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .save()

Upvotes: 0

Related Questions