Reputation: 5389
I am using Spark 1.5.0 and I have this issue:
val df = paired_rdd.reduceByKey {
case (val1, val2) => val1 + "|" + val2
}.toDF("user_id","description")
Here is sample data for df, as you can see the column description has
this format (text1#text3#weight | text1#text3#weight|....)
user1
book1#author1#0.07841217886795074|tool1#desc1#0.27044260397331488|song1#album1#-0.052661673730870676|item1#category1#-0.005683148395350108
I want to sort this df based on weight in descending order here is what I tried:
First split the contents at "|" and then for each of those strings, split them at "#" and get the 3rd string which is weight and then convert that into a double value
val getSplitAtWeight = udf((str: String) => {
str.split("|").foreach(_.split("#")(2).toDouble)
})
Sort based on the weigh value returned by the udf (in descending manner)
val df_sorted = df.sort(getSplitAtWeight(col("description")).desc)
I get the following error:
Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type Unit is not supported at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29) at org.apache.spark.sql.functions$.udf(functions.scala:2242)
Upvotes: 0
Views: 11394
Reputation: 215117
Change foreach
in your udf
to map
as following will eliminate the exception:
def getSplitAtWeight = udf((str: String) => {
str.split('|').map(_.split('#')(2).toDouble)
})
The problem with your method is that foreach
method on List
doesn't return anything, i.e., its result is of type Unit
that's why you get the Exception
. To understand more about the foreach
, check this blog.
Upvotes: 2