NinjaDev
NinjaDev

Reputation: 1258

Can't write big DataFrame into MSSQL server by using jdbc driver on Azure Databricks

I'm reading a huge csv file including 39,795,158 records and writing into MSSQL server, on Azure Databricks. The Databricks(notebook) is running on a cluster node with 56 GB Memory, 16 Cores, and 12 workers.

This is my code in Python and PySpark:

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from time import sleep

url = "jdbc:sqlserver://{0}:{1};database={2}".format(server, port, database)
spark.conf.set("spark.databricks.io.cache.enabled", True)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# Read csv file.
df_lake = spark.read \
    .option('header', 'false') \
    .schema(s) \
    .option('delimiter', ',') \
    .csv('wasbs://...')


batch_size = 60000
rows = df_lake.count()
org_pts = df_lake.rdd.getNumPartitions() # 566
new_pts = 1990

# Re-partition the DataFrame
df_repartitioned = df_lake.repartition(new_pts)

# Write the DataFrame into MSSQL server, by using JDBC driver
df_repartitioned.write \
            .format("jdbc") \
            .mode("overwrite") \
            .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
            .option("url", url) \
            .option("dbtable", tablename) \
            .option("user", username) \
            .option("password", password) \
            .option("batchsize", batch_size) \
            .save()
sleep(10)

Then I got the logs and errors as following as:

rows: 39795158
org_pts: 566
new_pts: 1990

Copy error:  An error occurred while calling o9647.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 62 in stage 462.0 failed 4 times, most recent failure: Lost task 62.3 in stage 462.0 (TID 46609) (10.139.64.12 executor 27): com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:234)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.checkClosed(SQLServerConnection.java:1217)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.rollback(SQLServerConnection.java:3508)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:728)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:857)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:855)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1025)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1025)
    at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:2517)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:91)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1620)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2828)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2775)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2769)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2769)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1305)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1305)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1305)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3036)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2977)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2965)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1067)
    at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2477)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2460)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2498)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2517)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2542)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1025)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:419)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1023)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:855)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:63)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:96)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:213)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:257)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:253)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:209)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:167)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:166)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:1080)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:130)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:273)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:223)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:1080)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:469)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:439)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:312)
    at sun.reflect.GeneratedMethodAccessor448.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:234)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.checkClosed(SQLServerConnection.java:1217)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.rollback(SQLServerConnection.java:3508)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:728)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:857)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:855)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1025)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1025)
    at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:2517)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:91)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1620)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

For 3 - 6 millions records, it was no problem. But for 10 millions or above records, it was failed. I'm not sure why it was happened on 10 millions or above records.

Are there any solutions for huge DataFrame process on Azure Databricks?

Upvotes: 1

Views: 1528

Answers (2)

NinjaDev
NinjaDev

Reputation: 1258

I solved by reducing the Memory and Cores of my cluster. I setup the cluster again, with 14GB Memory, 4 Cores, and 8 Workers. It worked. It's writing without any error. I'm not sure why it was failed on bigger settings for cluster

Upvotes: 0

Abhishek Khandave
Abhishek Khandave

Reputation: 3240

Using too many partitions when reading from the external database risks overloading that database with too many queries. Most DBMS systems have limits on the concurrent connections. As a starting point, aim to have the number of partitions be close to the number of cores or task slots in your Spark cluster in order to maximize parallelism but keep the total number of queries capped at a reasonable limit.

Workaround

If you need lots of parallelism after fetching the JDBC rows (because you’re doing something CPU bound in Spark) but don’t want to issue too many concurrent queries to your database then consider using a lower numPartitions for the JDBC read and then doing an explicit repartition() in Spark.

Refer this official doc

Upvotes: 1

Related Questions