Reputation: 1857
I have two DataFrames, and I want to apply distance.euclidean(df1.select(col),df2.select(col))
for each column of the two DataFrame.
Example:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame([(1,10),(2,13)],["A","B"])
df2 = spark.createDataFrame([(3,40),(2,20)],["A","B"])
# Apply distance function for each columns of `df1` and `df2`
from scipy.spatial import distance
for col in df1.columns:
d = distance.euclidean(df1.select(col).rdd.flatMap(lambda x:x).collect(), df2.select(col).rdd.flatMap(lambda x:x).collect())
print(col,d)
The numbers of columns is large, about 5,000
. Is there any method calculate the distance
of the columns in parallel instead of calculating one by one using for
function.
Upvotes: 0
Views: 670
Reputation: 19415
As far as I know there is no built-in euclidean distance function, but you can easily build one with sum, pow, sqrt as the equation is pretty simple:
df1 = spark.createDataFrame([(1, 10, 1),(2, 13, 2), (3, 5, 3)], ["A", "B", "id"])
df2 = spark.createDataFrame([(3, 40, 1),(2, 20, 2), (3, 10, 3)],["A", "B", "id"])
df1 = df1.alias("df1")
df2 = df2.alias("df2")
df = df1.join(df2, 'id', 'inner')
df.show()
Output:
+---+---+---+---+---+
| id| A| B| A| B|
+---+---+---+---+---+
| 1| 1| 10| 3| 40|
| 3| 3| 5| 3| 10|
| 2| 2| 13| 2| 20|
+---+---+---+---+---+
expression = ['sqrt(sum(pow((df1.{col} - df2.{col}),2))) as {col}'.format(col=c) for c in df1.columns if c !='id']
print(expression)
df.selectExpr(expression).show()
Output:
['sqrt(sum(pow((df1.A - df2.A),2))) as A', 'sqrt(sum(pow((df1.B - df2.B),2))) as B']
+---+-----------------+
| A| B|
+---+-----------------+
|2.0|31.20897306865447|
+---+-----------------+
P.S.: collect should only be used when the dataframe is small, as all the data is loaded into the memory of your spark driver.
Upvotes: 1