Mohd Zoubi
Mohd Zoubi

Reputation: 186

How to read rows from Columns in Scala

I am working on a small project for converting students data to intervals. The program simply reads the data, and selects the marks (integer) from the marks columns, to convert them to intervals after sorting them in ascending order. Any one can help me with this particular part, with many thanks:

The code:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row


case class Rating(mark: Int, classes: String, schooles: String, name: String)

val Result = sc.textFile("hdfs://schools:9000/input/marks.csv").map(_.split(",")).map(p => Rating(p(0).toInt, p(1).trim, p(2).trim, p(3).trim)).toDF
val all_marks = Result.groupBy("classes", "schooles","name").agg(collect_list("mark") as "marks",count("*") as "cnt").where($"cnt" > 10)

val mrk=all_marks.select("marks")

The part I need help with:

mrk.foreach(
    var ascending=mrk.sort
    var interval=ascending[0]+"-"+ascending[ascending.size]
)

How can I read the marks row by row, so I can sort them, and convert them to an interval.

Upvotes: 1

Views: 1350

Answers (3)

jsdeveloper
jsdeveloper

Reputation: 4045

I needed to use a WrappedArray to get the UDF working, like this:

case class Rating(mark: Int, classes: String, schooles: String, name: String)

val Result = sc.parallelize(Seq(
  Rating(56, "classA", "SchoolA", "English"),
  Rating(57, "classB", "SchoolA", "English"),
  Rating(58, "classA", "SchoolA", "English"),
  Rating(59, "classB", "SchoolA", "English"),
  Rating(60, "classA", "SchoolA", "English"),
  Rating(61, "classA", "SchoolA", "English"))).toDF()


val toInterval = udf((marks: scala.collection.mutable.WrappedArray[Int]) => s"${marks.min}-${marks.max}")

val all_marks = Result.groupBy("classes", "schooles","name").agg(collect_list("mark") as "marks",count("*") as "cnt")

all_marks.select("marks").withColumn("interval", toInterval(col("marks"))).show()

Output:

+----------------+--------+
|           marks|interval|
+----------------+--------+
|[56, 58, 60, 61]|   56-61|
|        [57, 59]|   57-59|
+----------------+--------+

Upvotes: 1

Alex Chermenin
Alex Chermenin

Reputation: 858

It's possible to use a next way to get the such result - convert data frame to RDD with List type, apply map function and convert RDD back to data frame:

mrk.rdd.map(_.getList[Int](0).toList).map(l => s"${l.min} - ${l.max}").toDF("marks")

Note: getList[Int] returns Java's utils.List type and to convert it into Scala's List we have to use toList method and import scala.collection.JavaConversions._.

Also may be used DataSet API instead of RDD:

mrk.map(_.getList[Int](0).toList).map(l => s"${l.min} - ${l.max}").toDF("marks")

Upvotes: 1

koiralo
koiralo

Reputation: 23109

You can create a user defined functions to create a new field as a interval from your list

Here is a simple example as you have already calculated the column marks

import org.apache.spark.sql.functions._
val ddf1 = Seq(List(2,3,1), List(6,4,3)).toDF("marks")

val testUdf = udf((list: Seq[Int]) => {
  val ascending = list.sorted  //sorts in ascending order 
  s"${ascending(0)} - ${ascending(ascending.size - 1)}"
})

ddf1.withColumn("marks", testUdf($"marks"))

Output:

+-----+
|marks|
+-----+
|1 - 3|
|3 - 6|
+-----+

Hope this helps!

Upvotes: 2

Related Questions