Marco De Virgilis
Marco De Virgilis

Reputation: 1089

From Column to Array Scala Spark

I am trying to apply a function on a Column in scala, but i am encountering some difficulties.

There is this error

found   : org.apache.spark.sql.Column
required: Array[Double]

Is there a way to convert a Column to an Array? Thank you

Update:

Thank you very much for your answer, I think I am getting closer to what I am trying to achieve. I give you a little bit of more context:

Here the code:

object Targa_Indicators_Full {

  def get_quantile (variable: Array[Double], perc:Double) : Double = {
  val sorted_vec:Array[Double]=variable.sorted
  val pos:Double= Math.round(perc*variable.length)-1
  val quant:Double=sorted_vec(pos.toInt)
  quant
  }

def main(args: Array[String]): Unit = {

 val get_quantileUDF = udf(get_quantile _)

 val plate_speed = 
 trips_df.groupBy($"plate").agg(sum($"time_elapsed").alias("time"),sum($"space").alias("distance"),
 stddev_samp($"distance"/$"time_elapsed").alias("sd_speed"),
 get_quantileUDF($"distance"/$"time_elapsed",.75).alias("Quant_speed")).
 withColumn("speed", $"distance" / $"time")

}

Now I get this error:

type mismatch;
[error]  found   : Double(0.75)
[error]  required: org.apache.spark.sql.Column
[error]  get_quantileUDF($"distanza"/$"tempo_intermedio",.75).alias("IQR_speed")
                                                         ^
[error] one error found

What can I do? Thanks.

Upvotes: 1

Views: 1078

Answers (1)

Constantine
Constantine

Reputation: 1416

You cannot directly apply a function on the Dataframe column. You have to convert your existing function to UDF. Spark provides user to define custom user defined functions(UDF).

eg: You have a dataframe with array column

scala> val df=sc.parallelize((1 to 100).toList.grouped(5).toList).toDF("value")
df: org.apache.spark.sql.DataFrame = [value: array<int>]

You have defined a function to apply on the array type column

def convert( arr:Seq[Int] ) : String = {
  arr.mkString(",")
}

You have to convert this to udf before applying on the column

val convertUDF = udf(convert _)

And then you can apply your function:

df.withColumn("new_col", convertUDF(col("value")))

Upvotes: 2

Related Questions