Reputation: 33
I am trying to remove header from given input file. But I couldn't make it. Th is what I have written. Can someone help me how to remove headers from the txt or csv file.
import org.apache.spark.{SparkConf, SparkContext}
object SalesAmount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(getClass.getName).setMaster("local")
val sc = new SparkContext(conf)
val salesRDD = sc.textFile(args(0),2)
val salesPairRDD = salesRDD.map(rec => {
val fieldArr = rec.split(",")
(fieldArr(1), fieldArr(3).toDouble)
})
val totalAmountRDD = salesPairRDD.reduceByKey(_+_).sortBy(_._2,false)
val discountAmountRDD = totalAmountRDD.map(t => {
if (t._2 > 1000) (t._1,t._2 * 0.9)
else t
})
discountAmountRDD.foreach(println)
}
}
Upvotes: 0
Views: 812
Reputation: 74395
Skipping the first row when manually parsing text files using the RDD API is a bit tricky:
val salesPairRDD =
salesRDD
.mapPartitionsWithIndex((i, it) => if (i == 0) it.drop(1) else it)
.map(rec => {
val fieldArr = rec.split(",")
(fieldArr(1), fieldArr(3).toDouble)
})
The header line will be the first item in the first partition, so mapPartitionsWithIndex
is used to iterate over the partitions and to skip the first item if the partition index is 0
.
Upvotes: 1