Reputation: 3184
Here is sample file
Department,Designation,costToCompany,State
Sales,Trainee,12000,UP
Sales,Lead,32000,AP
Sales,Lead,32000,LA
Sales,Lead,32000,TN
Sales,Lead,32000,AP
Sales,Lead,32000,TN
Sales,Lead,32000,LA
Sales,Lead,32000,LA
Marketing,Associate,18000,TN
Marketing,Associate,18000,TN
HR,Manager,58000,TN
Produce an output as csv
Group by department, desigination, State
With additional columns with sum(costToCompany) and sum(TotalEmployeeCount)
Result should be like
Dept,Desg,state,empCount,totalCost
Sales,Lead,AP,2,64000
Sales,Lead,LA,3,96000
Sales,Lead,TN,2,64000
Following is the solution and writing to file is resulting in an error. What am i doing wrong here?
Step #1: Load file
val file = sc.textFile("data/sales.txt")
Step #2: Create a case class to represt the data
scala> case class emp(Dept:String, Desg:String, totalCost:Double, State:String)
defined class emp
Step #3: Split data and create RDD of emp object
scala> val fileSplit = file.map(_.split(","))
scala> val data = fileSplit.map(x => emp(x(0), x(1), x(2).toDouble, x(3)))
Step #4: Turn the data into Key/value par with key=(dept, desg,state) and value=(1,totalCost)
scala> val keyVals = data.map(x => ((x.Dept,x.Desg,x.State),(1,x.totalCost)))
Step #5: Group by using reduceByKey, as we want summation as well for total number of employees and the cost
scala> val results = keyVals.reduceByKey{(a,b) => (a._1+b._1, a._2+b._2)} //(a.count+ b.count, a.cost+b.cost)
results: org.apache.spark.rdd.RDD[((String, String, String), (Int, Double))] = ShuffledRDD[41] at reduceByKey at <console>:55
Step #6: save the results
scala> results.repartition(1).saveAsTextFile("data/result")
Error
17/08/16 22:16:59 ERROR executor.Executor: Exception in task 0.0 in stage 20.0 (TID 23)
java.lang.NumberFormatException: For input string: "costToCompany"
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1250)
at java.lang.Double.parseDouble(Double.java:540)
at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:232)
at scala.collection.immutable.StringOps.toDouble(StringOps.scala:31)
at $line85.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:51)
at $line85.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:51)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
17/08/16 22:16:59 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 20.0 (TID 23, localhost, executor driver): java.lang.NumberFormatException: For input string: "costToCompany"
Update 1 Forgot to remove header. update code here. Save is throwing a different error now. also, need to put the header back in the file.
scala> val file = sc.textFile("data/sales.txt")
scala> val header = fileSplit.first()
scala> val noHeaderData = fileSplit.filter(_(0) != header(0))
scala> case class emp(Dept:String, Desg:String, totalCost:Double, State:String)
scala> val data = noHeaderData.map(x => emp(x(0), x(1), x(2).toDouble, x(3)))
scala> val keyVals = data.map(x => ((x.Dept,x.Desg,x.State),(1,x.totalCost)))
scala> val resultSpecific = results.map(x => (x._1._1, x._1._2, x._1._3, x._2._1, x._2._2))
scala> resultSpecific.repartition(1).saveASTextFile("data/specific")
<console>:64: error: value saveASTextFile is not a member of org.apache.spark.rdd.RDD[(String, String, String, Int, Double)]
resultSpecific.repartition(1).saveASTextFile("data/specific")
Upvotes: 0
Views: 489
Reputation: 5710
The error is straight forward and it says that
:64: error: value saveASTextFile is not a member of org.apache.spark.rdd.RDD[(String, String, String, Int, Double)] resultSpecific.repartition(1).saveASTextFile("data/specific")
In fact, you got no method called saveASTextFile(...)
but saveAsTextFile(???)
. you have case error on your method name.
Upvotes: 0
Reputation: 28392
To answer your question as well as comments:
It would be easier for you to utilize dataframes in this case, as your file is in csv format you can use the following way to load and save the data. In this way, you do not need to concern yourself with splitting the rows in the file as well as taking care of the header (both when loading and saving).
val spark = SparkSession.builder.getOrCreate()
import spark.implicits._
val df = spark.read
.format("com.databricks.spark.csv")
.option("header", "true") //reading the headers
.load("csv/file/path");
The dataframe column names will then be the same as the header in the file. Instead of reduceByKey()
you can use the dataframe's groupBy()
and agg()
:
val res = df.groupBy($"Department", $"Designation", $"State")
.agg(count($"costToCompany").alias("empCount"), sum($"costToCompany").alias("totalCost"))
Then save it:
res.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("results.csv")
Upvotes: 4
Reputation: 408
when you are trying to cast into double, costToCompany string wont cast thats why its stuck when try to fire action. just drop first record from file and then it will work . you can also do such operation on dataframe also which will be easy
Upvotes: 2