red_quark
red_quark

Reputation: 1001

Calculate distance between vectors from different Spark dataframes

I have two Spark dataframes:

> df1
+--------+-----------------------------+
|   word |                     init_vec|
+--------+-----------------------------+
|  venus |[-0.235, -0.060, -0.609, ...]|
+--------+-----------------------------+


> df2
+-----------------------------+-----+
|                    targ_vec |   id|
+-----------------------------+-----+
|[-0.272, -0.070, -0.686, ...]| 45ha|
+-----------------------------+-----+
|[-0.234, -0.060, -0.686, ...]| 98pb|
+-----------------------------+-----+
|[-0.562, -0.334, -0.981, ...]| c09j|
+-----------------------------+-----+

I need to find euclidean distance between init_vec from df1 and each vector from targ_vec of df2 dataframe and return top 3 closest vector to init_vec.

    > desired_output
    +--------+-----------------------------+-----+----------+
    |   word |                     targ_vec|   id|  distance|
    +--------+-----------------------------+-----+----------+
    |  venus |[-0.234, -0.060, -0.686, ...]| 98pb|some_value|
    +--------+-----------------------------+-----+----------+
    |  venus |[-0.221, -0.070, -0.613, ...]| tg67|some_value|
    +--------+-----------------------------+-----+----------+
    |  venus |[-0.240, -0.091, -0.676, ...]| jhg6|some_value|
    +--------+-----------------------------+-----+----------+

I need to implement this using PySpark.

Upvotes: 1

Views: 642

Answers (1)

ARCrow
ARCrow

Reputation: 1857

After a cross join between df1 and df2 to add the df1.init_vec to all the rows of df2:

df1 = (df1
      .withColumn('distance', f.sqrt(f.expr('aggregate(transform(targ_vec, (element, idx) -> power(abs(element - element_at(init_vec, cast(idx + 1 as int))), 2)), cast(0 as double), (acc, value) -> acc + value)')))
     )

Then you can sort the dataframe and keep the 3 rows with the least distance values.

Upvotes: 0

Related Questions