Reputation: 799
I'm trying to handle common exceptions in Spark, like a .map operation not working correctly on all elements of the data or a FileNotFound exception. I have read all the existing questions and the following two posts:
https://www.nicolaferraro.me/2016/02/18/exception-handling-in-apache-spark
I have tried a Try statement within the line attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble
so it reads attributes => Try(mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble)
But that won't compile; the compiler won't recognise the .toDF()
statement later. I have also tried a Java-like Try { Catch {}} block but can't get the scope right; df
is then not returned. Does anyone know how to do this properly? Do I even need to handle these exceptions, as the Spark framework seems to deal with a FileNotFound exception already without me adding one. But I would like to generate an error with the number of fields in the schema if the input file has the wrong number of columns, for example.
Here's the code:
object DataLoadTest extends SparkSessionWrapper {
/** Helper function to create a DataFrame from a textfile, re-used in subsequent tests */
def createDataFrame(fileName: String): DataFrame = {
import spark.implicits._
//try {
val df = spark.sparkContext
.textFile("/path/to/file" + fileName)
.map(_.split("\\t"))
//mHealth user is the case class which defines the data schema
.map(attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble,
attributes(3).toDouble, attributes(4).toDouble,
attributes(5).toDouble, attributes(6).toDouble, attributes(7).toDouble,
attributes(8).toDouble, attributes(9).toDouble, attributes(10).toDouble,
attributes(11).toDouble, attributes(12).toDouble, attributes(13).toDouble,
attributes(14).toDouble, attributes(15).toDouble, attributes(16).toDouble,
attributes(17).toDouble, attributes(18).toDouble, attributes(19).toDouble,
attributes(20).toDouble, attributes(21).toDouble, attributes(22).toDouble,
attributes(23).toInt))
.toDF()
.cache()
df
} catch {
case ex: FileNotFoundException => println(s"File $fileName not found")
case unknown: Exception => println(s"Unknown exception: $unknown")
}
}
All suggestions appreciated. Thanks!
Upvotes: 7
Views: 57871
Reputation: 1078
apply try and catch block on dataframe columns:
(try{$"credit.amount"} catch{case e:Exception=> lit(0)}).as("credit_amount")
Upvotes: 0
Reputation: 311
Another option would be to use Try type in scala.
For example:
def createDataFrame(fileName: String): Try[DataFrame] = {
try {
//create dataframe df
Success(df)
} catch {
case ex: FileNotFoundException => {
println(s"File $fileName not found")
Failure(ex)
}
case unknown: Exception => {
println(s"Unknown exception: $unknown")
Failure(unknown)
}
}
}
Now, in the caller side, handle it like:
createDataFrame("file1.csv") match {
case Success(df) => {
// proceed with your pipeline
}
case Failure(ex) => //handle exception
}
This is slightly better than using Option as caller would know the reason for failure and can handle better.
Upvotes: 17
Reputation: 27373
Either you let the Exception be thrown out of the createDataFrame
method (and handle it outside), or change the signature to return Option[DataFrame]
:
def createDataFrame(fileName: String): Option[DataFrame] = {
import spark.implicits._
try {
val df = spark.sparkContext
.textFile("/path/to/file" + fileName)
.map(_.split("\\t"))
//mHealth user is the case class which defines the data schema
.map(attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble,
attributes(3).toDouble, attributes(4).toDouble,
attributes(5).toDouble, attributes(6).toDouble, attributes(7).toDouble,
attributes(8).toDouble, attributes(9).toDouble, attributes(10).toDouble,
attributes(11).toDouble, attributes(12).toDouble, attributes(13).toDouble,
attributes(14).toDouble, attributes(15).toDouble, attributes(16).toDouble,
attributes(17).toDouble, attributes(18).toDouble, attributes(19).toDouble,
attributes(20).toDouble, attributes(21).toDouble, attributes(22).toDouble,
attributes(23).toInt))
.toDF()
.cache()
Some(df)
} catch {
case ex: FileNotFoundException => {
println(s"File $fileName not found")
None
}
case unknown: Exception => {
println(s"Unknown exception: $unknown")
None
}
}
}
EDIT: on the caller-side of createDataFrame there are several patterns. If you are processing several filenames, you can e.g. do:
val dfs : Seq[DataFrame] = Seq("file1","file2","file3").map(createDataFrame).flatten
If you are working on a single filename, you can do:
createDataFrame("file1.csv") match {
case Some(df) => {
// proceed with your pipeline
val df2 = df.filter($"activityLabel" > 0).withColumn("binaryLabel", when($"activityLabel".between(1, 3), 0).otherwise(1))
}
case None => println("could not create dataframe")
}
Upvotes: 2