Reputation: 369
How do I do Exception handling in Spark - Scala for invalid records Here is my code:
val rawData = sc.textFile(file)
val rowRDD = rawData.map(line => Row.fromSeq(line.split(",")))
val rowRDMapped = rowRDD.map { x => x.get(1), x.get(10) }
val DF = rowRDMapped.toDF("ID", "name" )
Everything works fine if the input data is fine, If I dont have enough fields, I get ArrayIndexOutOfBoundException.
I am trying to put try-catch around, but I am not able to skip the records with invalid data, via try catch
val rowRDMapped = rowRDD.map { try {
x => x.get(1), x.get(10)
}catch {
println("Invalid Data")
//Here it expects to return ROW, but I am not sure what to do here, since I dont want any data to be returned.
}
}
Please let me know how to solve the issue with try catch and if there is any better solution, that also would help lot
Upvotes: 0
Views: 2377
Reputation: 31
Instead of try-catch you can use Try
Code below will filter out data lines which don't have enough fields and get dataframe with remaining ones.
val rawData = sc.textFile(line)
val rowRDD = rawData.map(line => Row.fromSeq(line.split(",")))
val rowRDMapped = rowRDD.flatMap{ x => Try(x.getString(1), x.getString(10)).toOption }
val DF = rowRDMapped.toDF("ID", "name")
Upvotes: 1
Reputation: 41957
you can use try catch as below and filter later on
val rawData = sc.textFile(file)
val rowRDD = rawData.map(line => Row.fromSeq(line.split(",")))
val rowRDMapped = rowRDD.map(x => (Try(x.get(1).toString) getOrElse "blank", Try(x.get(10).toString) getOrElse "blank"))
val DF = rowRDMapped.toDF("ID", "name").filter($"name" =!= "blank")
Upvotes: 1
Reputation: 1766
the simplest:
val rawData = sc.textFile(file)
val rowRDD = rawData.map(line => Row.fromSeq(line.split(",")))
val rowRDMapped = rowRDD.filter(_.length >= 11).map(x => x.get(1), x.get(10))
better to use collect (don't confuse with other function)
val rowRDMapped = rowRDD.collect{x if x.length >= 11 => x.get(1), x.get(10)}
Upvotes: 1