Reputation: 25770
In order to handle Spark exception on RDD operations I can use the following approach with additional exceptions
column:
val df: DataFrame = ...
val rddWithExcep = df.rdd.map { row: Row =>
val memberIdStr = row.getAs[String]("member_id")
val memberIdInt = Try(memberIdStr.toInt) match {
case Success(integer) => List(integer, null)
case Failure(ex) => List(null, ex.toString)
}
Row.fromSeq(row.toSeq.toList ++ memberIdInt)
}
val castWithExceptionSchema = StructType(df.schema.fields ++ Array(StructField("member_id_int", IntegerType, true)
, StructField("exceptions", StringType, true)))
val castExcepDf = sparkSession.sqlContext.createDataFrame(rddWithExcep, castWithExceptionSchema)
castExcepDf.printSchema()
castExcepDf.show()
Is it possible to handle such exception on Spark SQL? For example, currently in case of any errors, Spark SQL simply returns null
value and hides the error.
For example division by 0 will be resulted into null
value and not into an error..
In my opinion - this is a very serious issue in Spark SQL because it can simple produces unexpected/wrong data that you won't even notice.
Is it possible to override this behavior and let Spark fail with an appropriate detailed exception?
Upvotes: 10
Views: 3381
Reputation: 5068
Since Spark 3.0, you can set property spark.sql.ansi.enabled
to true
in your spark session to throw exception and stop spark execution instead of saving null
value in column. However, failure will be global and not on row per row basis. See ANSI Compliance page on Spark's documentation website for more details.
So the following code snippet:
sparkSession.conf.set("spark.sql.ansi.enabled", "true")
Seq(1, 2, 3).toDF("MyCol")
.withColumn("divideByZero", col("MyCol") / 0)
.show(false)
throws the following exception:
Exception in thread "main" org.apache.spark.SparkArithmeticException: divide by zero
at org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:140)
at org.apache.spark.sql.catalyst.expressions.DivModLike.eval(arithmetic.scala:437)
at org.apache.spark.sql.catalyst.expressions.DivModLike.eval$(arithmetic.scala:425)
at org.apache.spark.sql.catalyst.expressions.Divide.eval(arithmetic.scala:534)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:168)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:97)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.$anonfun$applyOrElse$80(Optimizer.scala:1840)
at scala.collection.immutable.List.map(List.scala:293)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.applyOrElse(Optimizer.scala:1840)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$43.applyOrElse(Optimizer.scala:1835)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:486)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
at org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode.mapChildren(LogicalPlan.scala:206)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:486)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformWithPruning(TreeNode.scala:447)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1835)
at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$.apply(Optimizer.scala:1833)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:138)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:134)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:130)
at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:148)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:166)
at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:163)
at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:214)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:259)
at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:228)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
...
Upvotes: 2