ozlemg
ozlemg

Reputation: 436

How to calculate euclidean distance of each row in a dataframe to a constant reference array

I have a dataframe which is created from parquet files that has 512 columns(all float values).

I am trying to calculate euclidean distance of each row in my dataframe to a constant reference array.

My development environment is Zeppelin 0.7.3 with spark 2.1 and Scala. Here is the zeppelin paragraphs I run:

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

//Create dataframe from parquet file
val filePath = "/tmp/vector.parquet/*.parquet" 
val df = spark.read.parquet(filePath)

//Create assembler and vectorize df
val assembler = new VectorAssembler()
  .setInputCols(df.columns)
  .setOutputCol("features")
val training = assembler.transform(df)

//Create udf
val eucDisUdf = udf((features: Vector, 
myvec:Vector)=>Vectors.sqdist(features, myvec))

//Cretae ref vector
val myScalaVec = Vectors.dense( Array.fill(512)(25.44859))
val distDF = 
training2.withColumn("euc",eucDisUdf($"features",myScalaVec))

This code gives the following error for eucDisUdf call:

error: type mismatch;  found   : org.apache.spark.ml.linalg.Vector 
required: org.apache.spark.sql.Column

I appreciate any idea how to eliminate this error and compute distances properly in scala.

Upvotes: 2

Views: 1681

Answers (1)

philantrovert
philantrovert

Reputation: 10092

I think you can use currying to achieve that:

def eucDisUdf(myvec:Vector) = udf((features: Vector) => Vectors.sqdist(features, myvec))

val myScalaVec = Vectors.dense(Array.fill(512)(25.44859))

val distDF = training2.withColumn( "euc", eucDisUdf(myScalaVec)($"features") )

Upvotes: 2

Related Questions