alnkr
alnkr

Reputation: 33

remove header from csv while reading from from txt or csv file in spark scala

I am trying to remove header from given input file. But I couldn't make it. Th is what I have written. Can someone help me how to remove headers from the txt or csv file.

import org.apache.spark.{SparkConf, SparkContext}

object SalesAmount {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName(getClass.getName).setMaster("local")

    val sc = new SparkContext(conf)

    val salesRDD = sc.textFile(args(0),2)

    val salesPairRDD = salesRDD.map(rec => {
      val fieldArr = rec.split(",")
      (fieldArr(1), fieldArr(3).toDouble)
    })

    val totalAmountRDD = salesPairRDD.reduceByKey(_+_).sortBy(_._2,false)



    val discountAmountRDD = totalAmountRDD.map(t => {
      if (t._2 > 1000) (t._1,t._2 * 0.9)
      else t
    })

    discountAmountRDD.foreach(println)


  }

}

Upvotes: 0

Views: 812

Answers (1)

Hristo Iliev
Hristo Iliev

Reputation: 74395

Skipping the first row when manually parsing text files using the RDD API is a bit tricky:

val salesPairRDD =
  salesRDD
    .mapPartitionsWithIndex((i, it) => if (i == 0) it.drop(1) else it)
    .map(rec => {
      val fieldArr = rec.split(",")
      (fieldArr(1), fieldArr(3).toDouble)
    })

The header line will be the first item in the first partition, so mapPartitionsWithIndex is used to iterate over the partitions and to skip the first item if the partition index is 0.

Upvotes: 1

Related Questions