Thelight
Thelight

Reputation: 369

Apache spark scala Exception handling

How do I do Exception handling in Spark - Scala for invalid records Here is my code:

val rawData = sc.textFile(file)
val rowRDD = rawData.map(line => Row.fromSeq(line.split(",")))
val rowRDMapped = rowRDD.map { x => x.get(1), x.get(10) }
val DF = rowRDMapped.toDF("ID", "name" )

Everything works fine if the input data is fine, If I dont have enough fields, I get ArrayIndexOutOfBoundException.

I am trying to put try-catch around, but I am not able to skip the records with invalid data, via try catch

val rowRDMapped = rowRDD.map { try {
                                    x => x.get(1), x.get(10) 
                                    }catch {
                                        println("Invalid Data")
                                        //Here it expects to return ROW, but I am not sure what to do here, since I dont want any data to be returned.
                                    }
                             }  

Please let me know how to solve the issue with try catch and if there is any better solution, that also would help lot

Upvotes: 0

Views: 2377

Answers (3)

Kunal Thakare
Kunal Thakare

Reputation: 31

Instead of try-catch you can use Try

Code below will filter out data lines which don't have enough fields and get dataframe with remaining ones.

val rawData = sc.textFile(line)
val rowRDD = rawData.map(line => Row.fromSeq(line.split(",")))
val rowRDMapped = rowRDD.flatMap{ x => Try(x.getString(1), x.getString(10)).toOption }
val DF = rowRDMapped.toDF("ID", "name")

Upvotes: 1

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

you can use try catch as below and filter later on

val rawData = sc.textFile(file)
val rowRDD = rawData.map(line => Row.fromSeq(line.split(",")))
val rowRDMapped = rowRDD.map(x => (Try(x.get(1).toString) getOrElse "blank", Try(x.get(10).toString) getOrElse "blank"))
val DF = rowRDMapped.toDF("ID", "name").filter($"name" =!= "blank")

Upvotes: 1

Zernike
Zernike

Reputation: 1766

the simplest:

val rawData = sc.textFile(file)
val rowRDD = rawData.map(line => Row.fromSeq(line.split(",")))
val rowRDMapped = rowRDD.filter(_.length >= 11).map(x => x.get(1), x.get(10))

better to use collect (don't confuse with other function)

val rowRDMapped = rowRDD.collect{x if x.length >= 11 => x.get(1), x.get(10)}

Upvotes: 1

Related Questions