karthik subramanian
karthik subramanian

Reputation: 183

sortByKey() function in Spark Scala not working properly

I am trying to run a sortByKey() function on the following text file.

EMP_NAME EMP_ID SALARY
Adam     22      100
Bob      25      102
Bob      28      104
Chris    29      110

I am taking EMP_NAME as the key for the following text file. I am running the following command: textFile.sortByKey() I am getting the following output:

Bob
Bob
Adam
Chris

Help is appreciated..Thank You.

Upvotes: 2

Views: 3034

Answers (3)

Anchit
Anchit

Reputation: 68

Python:

 sc.parallelize([['Chris',29,110],['Bob',28,104],['Bob',25,102],['Adam',22,100]]).groupBy(lambda x: x[0]).sortByKey().flatMap(lambda x: list(x[1])).collect()

[['Adam', 22, 100], ['Bob', 25, 102], ['Bob', 28, 104], ['Chris', 29, 110]]

Scala:

sc.parallelize(List(Array("Chris",29,110),Array("Bob",28,104),Array("Bob",25,102),Array("Adam",22,100))).groupBy(x => x(0).asInstanceOf[String]).sortByKey().flatMap(x=> x._2).collect()

Array[Array[Any]] = Array(Array(Adam, 22, 100), Array(Bob, 28, 104), Array(Bob, 25, 102), Array(Chris, 29, 110))

You may want to put the other columns as part of your key if you want to include them in your sorting criteria. So in the example above sorting of Bob by the 2nd column won't be there.

Upvotes: 0

Kshitij Kulshrestha
Kshitij Kulshrestha

Reputation: 2072

Here I'm providing the dataset and the code to perform the functionality of sorting by key, if you won't found it helpful then please provide us the code, we will look into the issue.

Data -> (tab seperated file)

EMP_NAME    EMP_ID  SALARY
Adam    22  100
Bob 25  102
Bob 28  104
Chris   29  110

Code ->

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

/*
* @author Kshitij Kulshrestha
*/

object A1 {
def main(args: Array[String]): Unit = {

// set up environment
val sparkHome = "/usr/spark_pack/spark-1.4.1-bin-hadoop2.4/";
val sparkMasterUrl = "spark://SYSTEMX:7077";

val conf = new SparkConf()
.setAppName("A1")
.setMaster("local[2]")
.setSparkHome(sparkHome)

val sc = new SparkContext(conf)

val dataRDD = sc.textFile("src/Source/A1_data").filter { !_.contains("EMP_NAME") }
.map { x =>
{
val temp = x.split("\t")

((temp(0)), (temp(1), temp(2)))
}
}

val sortedDataRDD = dataRDD coalesce(1) sortByKey()
sortedDataRDD foreach (println(_))

}
}

Output ->

(Adam,(22,100))
(Bob,(25,102))
(Bob,(28,104))
(Chris,(29,110))

Upvotes: 0

Ajay Gupta
Ajay Gupta

Reputation: 3212

If you are using SparkConffiguration as

val conf = new SparkConf().setMaster("local")

Then the number of partition created by default is 1.

But if you are using

val conf = new SparkConf().setMaster("local[*]")

and you have got extra cores available for Spark it will partition the data based upon it for parallel execution of task Spark can do.

To get the number of partition Spark has made :

println(partitions.length)
//For my machine it was 2

If the data is partitioned then the sorting is done on the elements in that partition only and at end output from each partition is merged.To avoid this scenario you can enforce the numPartition as 1 in sortByKey method and get the data into one partition and then sort it.

textFile.sortByKey(numPartitions = 1).foreach(println)

Which will make it partition into 1 and you will be getting proper sorted output across the input data.

Upvotes: 9

Related Questions