Reputation: 379
I am working with pyspark, and wondering if there is any smart way to get euclidean dstance between one row entry of array and the whole column. For instance, there is a dataset like this.
+--------------------+---+
| features| id|
+--------------------+---+
|[0,1,2,3,4,5 ...| 0|
|[0,1,2,3,4,5 ...| 1|
|[1,2,3,6,7,8 ...| 2|
Choose one of the column i.e. id==1, and calculate the euclidean distance. In this case, the result should be [0,0,sqrt(1+1+1+9+9+9)]. Can anybody figure out how to do this efficiently? Thanks!
Upvotes: 10
Views: 15255
Reputation: 571
Here is an implementation using SQL Function power() to compute Euclidean distance between matching rows in two dataframes
cols2Join = ['Key1','Key2']
colsFeature =['Feature1','Feature2','Feature3','Feature4']
columns = cols2Join + colsFeature
valuesA = [('key1value1','key2value1',111,22,33,.334),('key1value3','key2value3', 333,444,12,.445),('key1value5','key2value5',555,666,101,.99),('key1value7','key2value7',777,888,10,.019)]
table1 = spark.createDataFrame(valuesA,columns)
valuesB = [('key1value1','key2value1',22,33,3,.1),('key1value3','key2value3', 88,99,4,1.23),('key1value5','key2value5',4,44,1,.998),('key1value7','key2value7',9,99,1,.3)]
table2= spark.createDataFrame(valuesB,columns)
#Create the sql expression using list comprehension, we use sql function power to compute euclidean distance inline
beginExpr='power(('
InnerExpr = ['power((a.{}-b.{}),2)'.format(x,x) for x in colsFeature]
InnerExpr = '+'.join(str(e) for e in InnerExpr)
endExpr ='),0.5) AS EuclideanDistance'
distanceExpr = beginExpr + InnerExpr + endExpr
Expr = cols2Join+ [distanceExpr]
#now just join the tables and use Select Expr to get Euclidean distance
outDF = table1.alias('a').join(table2.alias('b'),cols2Join,how="inner").selectExpr(Expr)
display(outDF)
Upvotes: 2
Reputation: 2545
If you want euclidean for a fixed entry with a column, simply do this.
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
from scipy.spatial import distance
fixed_entry = [0,3,2,7...] #for example, the entry against which you want distances
distance_udf = F.udf(lambda x: float(distance.euclidean(x, fixed_entry)), FloatType())
df = df.withColumn('distances', distance_udf(F.col('features')))
Your df will have a column of distances.
Upvotes: 13
Reputation: 4301
If you need to find euclidean distances between only one particular row and every other row in dataframe, then you can filter & collect that row and pass it to udf
.
But, if you need to calculate distance between all pairs You need to use join.
Repartition the dataframe by id, it will speed up the join operation. No need to calculate full pairwise matrix, just calculate the upper or lower half and replicate it. I wrote a function for myself based on the this logic.
df = df.repartition("id")
df.cache()
df.show()
#metric = any callable function to calculate distance b/w two vectors
def pairwise_metric(Y, metric, col_name="metric"):
Y2 = Y.select(f.col("id").alias("id2"),
f.col("features").alias("features2"))
# join to create lower or upper half
Y = Y.join(Y2, Y.id < Y2.id2, "inner")
def sort_list(x):
x = sorted(x, key=lambda y:y[0])
x = list(map(lambda y:y[1], x))
return(x)
udf_diff = f.udf(lambda x,y: metric(x,y), t.FloatType())
udf_sort = f.udf(sort_list, t.ArrayType(t.FloatType()))
Yid = Y2.select("id2").distinct().select("id2",
f.col("id2").alias("id")).withColumn("dist", f.lit(0.0))
Y = Y.withColumn("dist", udf_diff("features",
"features2")).drop("features","features2")
# just swap the column names and take union to get the other half
Y =Y.union(Y.select(f.col("id2").alias("id"),
f.col("id").alias("id2"), "dist"))
# union for the diagonal elements of distance matrix
Y = Y.union(Yid)
st1 = f.struct(["id2", "dist"]).alias("vals")
# groupby , aggregate and sort
Y = (Y.select("id",st1).groupBy("id").agg(f.collect_list("vals").
alias("vals")).withColumn("dist",udf_sort("vals")).drop("vals"))
return(Y.select(f.col("id").alias("id1"), f.col("dist").alias(col_name)))
Upvotes: 0
Reputation: 361
You can do BucketedRandomProjectionLSH
[1] to get a cartesian of distances between your data frame.
from pyspark.ml.feature import BucketedRandomProjectionLSH
brp = BucketedRandomProjectionLSH(
inputCol="features", outputCol="hashes", seed=12345, bucketLength=1.0
)
model = brp.fit(df)
model.approxSimilarityJoin(df, df, 3.0, distCol="EuclideanDistance")
You can also get distances for one row to column with approxNearestNeighbors
[2], but the results are limited by numNearestNeighbors
, so you could give it the count of the entire data frame.
one_row = df.where(df.id == 1).first().features
model.approxNearestNeighbors(df2, one_row, df.count()).collect()
Also, make sure to convert your data to Vectors!
from pyspark.sql import functions as F
to_dense_vector = F.udf(Vectors.dense, VectorUDF())
df = df.withColumn('features', to_dense_vector('features'))
Upvotes: 4