Reputation: 1001
I have two Spark dataframes:
> df1
+--------+-----------------------------+
| word | init_vec|
+--------+-----------------------------+
| venus |[-0.235, -0.060, -0.609, ...]|
+--------+-----------------------------+
> df2
+-----------------------------+-----+
| targ_vec | id|
+-----------------------------+-----+
|[-0.272, -0.070, -0.686, ...]| 45ha|
+-----------------------------+-----+
|[-0.234, -0.060, -0.686, ...]| 98pb|
+-----------------------------+-----+
|[-0.562, -0.334, -0.981, ...]| c09j|
+-----------------------------+-----+
I need to find euclidean distance between init_vec
from df1
and each vector from targ_vec
of df2
dataframe and return top 3 closest vector to init_vec
.
> desired_output
+--------+-----------------------------+-----+----------+
| word | targ_vec| id| distance|
+--------+-----------------------------+-----+----------+
| venus |[-0.234, -0.060, -0.686, ...]| 98pb|some_value|
+--------+-----------------------------+-----+----------+
| venus |[-0.221, -0.070, -0.613, ...]| tg67|some_value|
+--------+-----------------------------+-----+----------+
| venus |[-0.240, -0.091, -0.676, ...]| jhg6|some_value|
+--------+-----------------------------+-----+----------+
I need to implement this using PySpark.
Upvotes: 1
Views: 642
Reputation: 1857
After a cross join between df1 and df2 to add the df1.init_vec to all the rows of df2:
df1 = (df1
.withColumn('distance', f.sqrt(f.expr('aggregate(transform(targ_vec, (element, idx) -> power(abs(element - element_at(init_vec, cast(idx + 1 as int))), 2)), cast(0 as double), (acc, value) -> acc + value)')))
)
Then you can sort the dataframe and keep the 3 rows with the least distance values.
Upvotes: 0