learning...
learning...

Reputation: 3184

Spark reducer and summation result issue

Here is sample file

Department,Designation,costToCompany,State

    Sales,Trainee,12000,UP
    Sales,Lead,32000,AP
    Sales,Lead,32000,LA
    Sales,Lead,32000,TN
    Sales,Lead,32000,AP
    Sales,Lead,32000,TN 
    Sales,Lead,32000,LA
    Sales,Lead,32000,LA
    Marketing,Associate,18000,TN
    Marketing,Associate,18000,TN
    HR,Manager,58000,TN

Produce an output as csv

Result should be like

Dept,Desg,state,empCount,totalCost
Sales,Lead,AP,2,64000
Sales,Lead,LA,3,96000
Sales,Lead,TN,2,64000

Following is the solution and writing to file is resulting in an error. What am i doing wrong here?

Step #1: Load file

val file = sc.textFile("data/sales.txt")

Step #2: Create a case class to represt the data

scala> case class emp(Dept:String, Desg:String, totalCost:Double, State:String)
defined class emp

Step #3: Split data and create RDD of emp object

scala> val fileSplit = file.map(_.split(","))
scala> val data = fileSplit.map(x => emp(x(0), x(1), x(2).toDouble, x(3)))

Step #4: Turn the data into Key/value par with key=(dept, desg,state) and value=(1,totalCost)

scala> val keyVals = data.map(x => ((x.Dept,x.Desg,x.State),(1,x.totalCost)))

Step #5: Group by using reduceByKey, as we want summation as well for total number of employees and the cost

scala> val results = keyVals.reduceByKey{(a,b) => (a._1+b._1, a._2+b._2)} //(a.count+ b.count, a.cost+b.cost)
results: org.apache.spark.rdd.RDD[((String, String, String), (Int, Double))] = ShuffledRDD[41] at reduceByKey at <console>:55

Step #6: save the results

scala> results.repartition(1).saveAsTextFile("data/result")

Error

17/08/16 22:16:59 ERROR executor.Executor: Exception in task 0.0 in stage 20.0 (TID 23)
java.lang.NumberFormatException: For input string: "costToCompany"
    at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1250)
    at java.lang.Double.parseDouble(Double.java:540)
    at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:232)
    at scala.collection.immutable.StringOps.toDouble(StringOps.scala:31)
    at $line85.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:51)
    at $line85.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:51)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
17/08/16 22:16:59 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 20.0 (TID 23, localhost, executor driver): java.lang.NumberFormatException: For input string: "costToCompany"

Update 1 Forgot to remove header. update code here. Save is throwing a different error now. also, need to put the header back in the file.

scala> val file = sc.textFile("data/sales.txt")
scala> val header = fileSplit.first()
scala> val noHeaderData = fileSplit.filter(_(0) != header(0))
scala> case class emp(Dept:String, Desg:String, totalCost:Double, State:String)
scala> val data = noHeaderData.map(x => emp(x(0), x(1), x(2).toDouble, x(3)))
scala> val keyVals = data.map(x => ((x.Dept,x.Desg,x.State),(1,x.totalCost)))
scala> val resultSpecific = results.map(x => (x._1._1, x._1._2, x._1._3, x._2._1, x._2._2))
scala> resultSpecific.repartition(1).saveASTextFile("data/specific")
<console>:64: error: value saveASTextFile is not a member of org.apache.spark.rdd.RDD[(String, String, String, Int, Double)]
          resultSpecific.repartition(1).saveASTextFile("data/specific")

Upvotes: 0

Views: 489

Answers (3)

Balaji Reddy
Balaji Reddy

Reputation: 5710

The error is straight forward and it says that

:64: error: value saveASTextFile is not a member of org.apache.spark.rdd.RDD[(String, String, String, Int, Double)] resultSpecific.repartition(1).saveASTextFile("data/specific")

In fact, you got no method called saveASTextFile(...) but saveAsTextFile(???). you have case error on your method name.

Upvotes: 0

Shaido
Shaido

Reputation: 28392

To answer your question as well as comments:

It would be easier for you to utilize dataframes in this case, as your file is in csv format you can use the following way to load and save the data. In this way, you do not need to concern yourself with splitting the rows in the file as well as taking care of the header (both when loading and saving).

val spark = SparkSession.builder.getOrCreate()
import spark.implicits._

val df = spark.read
        .format("com.databricks.spark.csv")
        .option("header", "true") //reading the headers
        .load("csv/file/path");

The dataframe column names will then be the same as the header in the file. Instead of reduceByKey() you can use the dataframe's groupBy() and agg():

val res = df.groupBy($"Department", $"Designation", $"State")
  .agg(count($"costToCompany").alias("empCount"), sum($"costToCompany").alias("totalCost"))

Then save it:

res.coalesce(1)
  .write.format("com.databricks.spark.csv")
  .option("header", "true")
  .save("results.csv")

Upvotes: 4

Rishikesh Teke
Rishikesh Teke

Reputation: 408

when you are trying to cast into double, costToCompany string wont cast thats why its stuck when try to fire action. just drop first record from file and then it will work . you can also do such operation on dataframe also which will be easy

Upvotes: 2

Related Questions