Reputation: 11
I have a dataframe in DataBricks which I am trying to bulk insert into SQL Server. I have followed this tutorial on Microsoft's website, specifically using this code:
# df is created as a Dataframe, with 1000 rows of sample data
server_name = "jdbc:sqlserver://x.database.windows.net"
database_name = "dbTest"
url = server_name + ";" + "databaseName=" + database_name + ";"
table_name = "dbo.Bulk"
username = "user123"
password = "Password123"
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
However this procedures the following error
java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.schemaString(Lorg/apache/spark/sql/Dataset;Ljava/lang/String;Lscala/Option;)Ljava/lang/String;
A more details error log:
Py4JJavaError Traceback (most recent call last)
<command-2622503877398381> in <module>
7 password = "********"
8
----> 9 df_countries.write \
10 .format("com.microsoft.sqlserver.jdbc.spark") \
11 .mode("overwrite") \
/databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
1132 self.format(format)
1133 if path is None:
-> 1134 self._jwrite.save()
1135 else:
1136 self._jwrite.save(path)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
108 def deco(*a, **kw):
109 try:
--> 110 return f(*a, **kw)
111 except py4j.protocol.Py4JJavaError as e:
112 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o1515.save.
: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.schemaString(Lorg/apache/spark/sql/Dataset;Ljava/lang/String;Lscala/Option;)Ljava/lang/String;
at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.mssqlCreateTable(BulkCopyUtils.scala:506)
at com.microsoft.sqlserver.jdbc.spark.SingleInstanceConnector$.createTable(SingleInstanceConnector.scala:46)
at com.microsoft.sqlserver.jdbc.spark.Connector.write(Connector.scala:90)
at com.microsoft.sqlserver.jdbc.spark.DefaultSource.createRelation(DefaultSource.scala:64)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:71)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:94)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:196)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:240)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:236)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:163)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:162)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:1079)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:126)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:217)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:1079)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:468)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:311)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
What am I doing wrong?
Upvotes: 1
Views: 7277
Reputation: 11
I had the exact same error because I was running Databricks 10.5 (scala 2.12, spark 3.2.1). Switch the Databricks version to 9.1 LTS (Spark 3.1.2, Scala 2.12) and the same code should go through
To speed up your bulk insert set tableLock option to true in your bulk insert code, the sql spark connector git project has benchmarks for different options.
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
**.option("tableLock", "true") \**
.save()
Upvotes: 0
Reputation: 21
I'm having the same issue.
The problem started when we wanted to use the newest cluster runtime (8.3 at the time of writing this). Going back to runtime 7 makes it work fine.
According to the Databricks' migration guide to 8: "Databricks Runtime 8.0 changes the default format to delta to make it simpler to create a Delta table." - as opposed to parquet.
https://docs.databricks.com/release-notes/runtime/8.0-migration.html
I thought since the error happens only when going with runtime 8 this might be causing it but I tried implementing Option 1 (adding the 2 configurations to the cluster) and the error is still happening.
I guess for now we're going to leave it to runtime 7 but at some time we'll need to upgrade.
Upvotes: 2
Reputation: 20302
I haven't used Databricks in at least 18 months, but this worked for me the last time I was using that technology.
from pyspark.sql.functions import input_file_name
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)
customSchema = StructType([ \
StructField("asset_id", StringType(), True), \
StructField("price_date", StringType(), True), \
etc.,
StructField("close_price", StringType(), True), \
StructField("filename", StringType(), True)])
fullpath = 'path_to files_and_all_credentials'
df = spark.read.format("csv") \
.option("header", "false") \
.option("sep","|") \
.schema(customSchema) \
.load(fullPath) \
.withColumn("filename", input_file_name())
# And, to write to DB.
import pandas as pd
url = "jdbc:sqlserver://server_name.database.windows.net:1433;databaseName=db_name"
props = {"user": "usr","password": "pwd", "batchsize" : "500000"}
pd.set_option('display.max_columns', None)
#df.printSchema()
#df.show()
df.write.mode('append').jdbc(url,"dbo.table_name",properties=props)
As an aside, you could try this.
spark_jdbcDF.write
.format("jdbc")
.option("url", "jdbc:sqlserver://yourserver.database.windows.net:1433")
.option("dbtable", "<your table name>")
.option("user", "username")
.option("password", "password")
.save()
# Saving data to a JDBC source
df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.save()
Upvotes: 0