Reputation: 186
I am working on a small project for converting students data to intervals. The program simply reads the data, and selects the marks (integer) from the marks columns, to convert them to intervals after sorting them in ascending order. Any one can help me with this particular part, with many thanks:
The code:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
case class Rating(mark: Int, classes: String, schooles: String, name: String)
val Result = sc.textFile("hdfs://schools:9000/input/marks.csv").map(_.split(",")).map(p => Rating(p(0).toInt, p(1).trim, p(2).trim, p(3).trim)).toDF
val all_marks = Result.groupBy("classes", "schooles","name").agg(collect_list("mark") as "marks",count("*") as "cnt").where($"cnt" > 10)
val mrk=all_marks.select("marks")
The part I need help with:
mrk.foreach(
var ascending=mrk.sort
var interval=ascending[0]+"-"+ascending[ascending.size]
)
How can I read the marks row by row, so I can sort them, and convert them to an interval.
Upvotes: 1
Views: 1350
Reputation: 4045
I needed to use a WrappedArray to get the UDF working, like this:
case class Rating(mark: Int, classes: String, schooles: String, name: String)
val Result = sc.parallelize(Seq(
Rating(56, "classA", "SchoolA", "English"),
Rating(57, "classB", "SchoolA", "English"),
Rating(58, "classA", "SchoolA", "English"),
Rating(59, "classB", "SchoolA", "English"),
Rating(60, "classA", "SchoolA", "English"),
Rating(61, "classA", "SchoolA", "English"))).toDF()
val toInterval = udf((marks: scala.collection.mutable.WrappedArray[Int]) => s"${marks.min}-${marks.max}")
val all_marks = Result.groupBy("classes", "schooles","name").agg(collect_list("mark") as "marks",count("*") as "cnt")
all_marks.select("marks").withColumn("interval", toInterval(col("marks"))).show()
Output:
+----------------+--------+
| marks|interval|
+----------------+--------+
|[56, 58, 60, 61]| 56-61|
| [57, 59]| 57-59|
+----------------+--------+
Upvotes: 1
Reputation: 858
It's possible to use a next way to get the such result - convert data frame to RDD with List type, apply map function and convert RDD back to data frame:
mrk.rdd.map(_.getList[Int](0).toList).map(l => s"${l.min} - ${l.max}").toDF("marks")
Note: getList[Int]
returns Java's utils.List
type and to convert it into Scala's List we have to use toList
method and import scala.collection.JavaConversions._
.
Also may be used DataSet API instead of RDD:
mrk.map(_.getList[Int](0).toList).map(l => s"${l.min} - ${l.max}").toDF("marks")
Upvotes: 1
Reputation: 23109
You can create a user defined functions to create a new field as a interval from your list
Here is a simple example as you have already calculated the column marks
import org.apache.spark.sql.functions._
val ddf1 = Seq(List(2,3,1), List(6,4,3)).toDF("marks")
val testUdf = udf((list: Seq[Int]) => {
val ascending = list.sorted //sorts in ascending order
s"${ascending(0)} - ${ascending(ascending.size - 1)}"
})
ddf1.withColumn("marks", testUdf($"marks"))
Output:
+-----+
|marks|
+-----+
|1 - 3|
|3 - 6|
+-----+
Hope this helps!
Upvotes: 2