user3803714
user3803714

Reputation: 5389

spark dataframe sorting based on subset of a column

I am generating the paired rdd/df through another processm but here is the code foe generating the dataset to help the debugging process.

Here is the sample i/p file (/scratch/test2.txt): 1 book1 author1 1.10 2 book2 author2 2.20 1 book3 author2 3.30

Here is the code for generating the dataframe

case class RefText (index: Int,  description: String, fName: String, weight: Double)
val annotation_split = sc.textFile("/scratch/test2.txt").map(_.split("\t"))     
val annotation =  annotation_split.map{line => RefText(line(0).toInt, line(1), line(2), line(3).toDouble)}.toDF()
val getConcatenated = udf( (first: String, second: String, third: Double) => { first + "#" + second + "#" + third.toString} )
val annotate_concated =  annotation.withColumn("annotation",getConcatenated(col("description"), col("fName"), col("weight"))).select("index","annotation")

annotate_concated.show()
+-----+-----------------+
|index|       annotation|
+-----+-----------------+
|    1|book1#author1#1.1|
|    2|book2#author2#2.2|
|    1|book3#author2#3.3|
+-----+-----------------+

//Here is how I generate pairedrdd. 
val paired_rdd : PairRDDFunctions[String, String] = annotate_concated.rdd.map(row => (row.getString(0), row.getString(1)))
val df  = paired_rdd.reduceByKey { case (val1, val2) => val1 + "|" + val2 }.toDF("user_id","description")

Here is sample data for my dataframe, column description has the following format (text1#text2#weight | text1#text2#weight|....)

user1 book1#author1#0.07841217886795074|tool1#desc1#1.27044260397331488|song1#album1#-2.052661673730870676|item1#category1#-0.005683148395350108

user2 book2#author1#4.07841217886795074|tool2#desc1#-1.27044260397331488|song2#album1#2.052661673730870676|item2#category1#-0.005683148395350108

I want to sort this the description column based on weight in descending order.

The desired o/p is:

user1 tool1#desc1#1.27044260397331488|book1#author1#0.07841217886795074|item1#category1#-0.005683148395350108|song1#album1#-2.052661673730870676

user2 book2#author1#4.07841217886795074|song2#album1#2.052661673730870676|tool2#desc1#-1.27044260397331488|item2#category1#-0.005683148395350108

Any help with this will be greatly appreciated.

Upvotes: 0

Views: 486

Answers (1)

Derlin
Derlin

Reputation: 9871

I don't think there is a straight-forward way to reorder values inside a cell. I would personally do the ordering beforehand, i.e. on the annotation_split rdd.

Here is an example (I had to change the code a bit to make it work). File on HDFS (used regular spaces and @ as a separator):

1 book1 author1 1.10 @ 2 book2 author2 2.20 @ 1 book3 author2 3.30 

Then:

case class RefText (index: Int,  description: String, fName: String, weight: Double)
// split by line, then split line into columns
val annotation_split = sc.textFile(path).flatMap(_.split(" @ ")).map{_.split(" ")} 

// HERE IS THE TRICK: sort the lines in descending order
val annotation_sorted = annotation_split
    .map(line => (line.last.toFloat,line))
    .sortByKey(false)
    .map(_._2)

// back to your code
val annotation =  annotation_sorted.map{line => RefText(line(0).toInt, line(1), line(2), line(3).toDouble)}.toDF()
val getConcatenated = udf( (first: String, second: String, third: Double) => { first + "#" + second + "#" + third.toString} )
val annotate_concated =  annotation.withColumn("annotation",getConcatenated(col("description"), col("fName"), col("weight"))).select("index","annotation")
// note: here, I replaced row.getString(0) by row.getInt(0) to avoid cast exception
val paired_rdd = annotate_concated.rdd.map(row => (row.getInt(0), row.getString(1)))
val df  = paired_rdd.reduceByKey { case (val1, val2) => val1 + "|" + val2 }.toDF("user_id","description")

The only problem is that the ordering might be mixed up afterwards, given your level of parallelism. Another way is to map over each column and rewrite it in a sorted fashion (split, sort, join).

Upvotes: 0

Related Questions