Reputation: 183
I am trying to run a sortByKey()
function on the following text file.
EMP_NAME EMP_ID SALARY
Adam 22 100
Bob 25 102
Bob 28 104
Chris 29 110
I am taking EMP_NAME
as the key for the following text file. I am running the following command: textFile.sortByKey()
I am getting the following output:
Bob
Bob
Adam
Chris
Help is appreciated..Thank You.
Upvotes: 2
Views: 3034
Reputation: 68
Python:
sc.parallelize([['Chris',29,110],['Bob',28,104],['Bob',25,102],['Adam',22,100]]).groupBy(lambda x: x[0]).sortByKey().flatMap(lambda x: list(x[1])).collect()
[['Adam', 22, 100], ['Bob', 25, 102], ['Bob', 28, 104], ['Chris', 29, 110]]
Scala:
sc.parallelize(List(Array("Chris",29,110),Array("Bob",28,104),Array("Bob",25,102),Array("Adam",22,100))).groupBy(x => x(0).asInstanceOf[String]).sortByKey().flatMap(x=> x._2).collect()
Array[Array[Any]] = Array(Array(Adam, 22, 100), Array(Bob, 28, 104), Array(Bob, 25, 102), Array(Chris, 29, 110))
You may want to put the other columns as part of your key if you want to include them in your sorting criteria. So in the example above sorting of Bob by the 2nd column won't be there.
Upvotes: 0
Reputation: 2072
Here I'm providing the dataset and the code to perform the functionality of sorting by key, if you won't found it helpful then please provide us the code, we will look into the issue.
Data -> (tab seperated file)
EMP_NAME EMP_ID SALARY
Adam 22 100
Bob 25 102
Bob 28 104
Chris 29 110
Code ->
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
/*
* @author Kshitij Kulshrestha
*/
object A1 {
def main(args: Array[String]): Unit = {
// set up environment
val sparkHome = "/usr/spark_pack/spark-1.4.1-bin-hadoop2.4/";
val sparkMasterUrl = "spark://SYSTEMX:7077";
val conf = new SparkConf()
.setAppName("A1")
.setMaster("local[2]")
.setSparkHome(sparkHome)
val sc = new SparkContext(conf)
val dataRDD = sc.textFile("src/Source/A1_data").filter { !_.contains("EMP_NAME") }
.map { x =>
{
val temp = x.split("\t")
((temp(0)), (temp(1), temp(2)))
}
}
val sortedDataRDD = dataRDD coalesce(1) sortByKey()
sortedDataRDD foreach (println(_))
}
}
Output ->
(Adam,(22,100))
(Bob,(25,102))
(Bob,(28,104))
(Chris,(29,110))
Upvotes: 0
Reputation: 3212
If you are using SparkConffiguration as
val conf = new SparkConf().setMaster("local")
Then the number of partition created by default is 1.
But if you are using
val conf = new SparkConf().setMaster("local[*]")
and you have got extra cores available for Spark it will partition the data based upon it for parallel execution of task Spark can do.
To get the number of partition Spark has made :
println(partitions.length)
//For my machine it was 2
If the data is partitioned then the sorting is done on the elements in that partition only and at end output from each partition is merged.To avoid this scenario you can enforce the numPartition as 1 in sortByKey method and get the data into one partition and then sort it.
textFile.sortByKey(numPartitions = 1).foreach(println)
Which will make it partition into 1 and you will be getting proper sorted output across the input data.
Upvotes: 9