Yong Hyun Kwon
Yong Hyun Kwon

Reputation: 379

Pyspark euclidean distance between entry and column

I am working with pyspark, and wondering if there is any smart way to get euclidean dstance between one row entry of array and the whole column. For instance, there is a dataset like this.

+--------------------+---+
|            features| id|
+--------------------+---+
|[0,1,2,3,4,5     ...|  0|
|[0,1,2,3,4,5     ...|  1|
|[1,2,3,6,7,8     ...|  2|

Choose one of the column i.e. id==1, and calculate the euclidean distance. In this case, the result should be [0,0,sqrt(1+1+1+9+9+9)]. Can anybody figure out how to do this efficiently? Thanks!

Upvotes: 10

Views: 15255

Answers (4)

Narendra Rana
Narendra Rana

Reputation: 571

Here is an implementation using SQL Function power() to compute Euclidean distance between matching rows in two dataframes

cols2Join = ['Key1','Key2']
colsFeature =['Feature1','Feature2','Feature3','Feature4']
columns = cols2Join + colsFeature

valuesA = [('key1value1','key2value1',111,22,33,.334),('key1value3','key2value3', 333,444,12,.445),('key1value5','key2value5',555,666,101,.99),('key1value7','key2value7',777,888,10,.019)]
table1 = spark.createDataFrame(valuesA,columns)
valuesB = [('key1value1','key2value1',22,33,3,.1),('key1value3','key2value3', 88,99,4,1.23),('key1value5','key2value5',4,44,1,.998),('key1value7','key2value7',9,99,1,.3)]
table2= spark.createDataFrame(valuesB,columns)

#Create the sql expression using list comprehension, we use sql function power to compute euclidean distance inline
beginExpr='power(('
InnerExpr = ['power((a.{}-b.{}),2)'.format(x,x) for x in colsFeature]
InnerExpr = '+'.join(str(e) for e in InnerExpr)
endExpr ='),0.5) AS EuclideanDistance'
distanceExpr = beginExpr + InnerExpr + endExpr
Expr =  cols2Join+  [distanceExpr]

#now just join the tables and use Select Expr to get Euclidean distance
outDF = table1.alias('a').join(table2.alias('b'),cols2Join,how="inner").selectExpr(Expr)

display(outDF)

Upvotes: 2

mayank agrawal
mayank agrawal

Reputation: 2545

If you want euclidean for a fixed entry with a column, simply do this.

import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
from scipy.spatial import distance

fixed_entry = [0,3,2,7...] #for example, the entry against which you want distances
distance_udf = F.udf(lambda x: float(distance.euclidean(x, fixed_entry)), FloatType())
df = df.withColumn('distances', distance_udf(F.col('features')))

Your df will have a column of distances.

Upvotes: 13

pauli
pauli

Reputation: 4301

If you need to find euclidean distances between only one particular row and every other row in dataframe, then you can filter & collect that row and pass it to udf.

But, if you need to calculate distance between all pairs You need to use join.
Repartition the dataframe by id, it will speed up the join operation. No need to calculate full pairwise matrix, just calculate the upper or lower half and replicate it. I wrote a function for myself based on the this logic.

 df = df.repartition("id")
 df.cache()
 df.show()


 #metric = any callable function to calculate distance b/w two vectors
 def pairwise_metric(Y, metric, col_name="metric"):

     Y2 = Y.select(f.col("id").alias("id2"), 
                 f.col("features").alias("features2"))

     # join to create lower or upper half
     Y = Y.join(Y2, Y.id < Y2.id2, "inner")

     def sort_list(x):

         x = sorted(x, key=lambda y:y[0])
         x = list(map(lambda y:y[1], x))

         return(x)

     udf_diff = f.udf(lambda x,y: metric(x,y), t.FloatType())
     udf_sort = f.udf(sort_list, t.ArrayType(t.FloatType()))

     Yid = Y2.select("id2").distinct().select("id2", 
          f.col("id2").alias("id")).withColumn("dist", f.lit(0.0))

     Y = Y.withColumn("dist", udf_diff("features", 
              "features2")).drop("features","features2")

     # just swap the column names and take union to get the other half
     Y =Y.union(Y.select(f.col("id2").alias("id"),
          f.col("id").alias("id2"), "dist"))
     # union for the diagonal elements of distance matrix
     Y = Y.union(Yid)

     st1 = f.struct(["id2", "dist"]).alias("vals")
     # groupby , aggregate and sort
     Y = (Y.select("id",st1).groupBy("id").agg(f.collect_list("vals").
                             alias("vals")).withColumn("dist",udf_sort("vals")).drop("vals"))

     return(Y.select(f.col("id").alias("id1"), f.col("dist").alias(col_name)))

Upvotes: 0

ryan
ryan

Reputation: 361

You can do BucketedRandomProjectionLSH [1] to get a cartesian of distances between your data frame.

from pyspark.ml.feature import BucketedRandomProjectionLSH

brp = BucketedRandomProjectionLSH(
    inputCol="features", outputCol="hashes", seed=12345, bucketLength=1.0
)
model = brp.fit(df)
model.approxSimilarityJoin(df, df, 3.0, distCol="EuclideanDistance")

You can also get distances for one row to column with approxNearestNeighbors [2], but the results are limited by numNearestNeighbors, so you could give it the count of the entire data frame.

one_row = df.where(df.id == 1).first().features
model.approxNearestNeighbors(df2, one_row, df.count()).collect()

Also, make sure to convert your data to Vectors!

from pyspark.sql import functions as F

to_dense_vector = F.udf(Vectors.dense, VectorUDF())
df = df.withColumn('features', to_dense_vector('features'))

[1] https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html?highlight=approx#pyspark.ml.feature.BucketedRandomProjectionLSH

[2] https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html?highlight=approx#pyspark.ml.feature.BucketedRandomProjectionLSHModel.approxNearestNeighbors

Upvotes: 4

Related Questions