Reputation: 7967
I have spark dataframe that has a column named features
that holds vectors of data. This column is the output of pyspark
's StandardScaler
object. I am creating a dataset here similar to the one I have.
# create sample data
arr = [[1,2,3], [4,5,6]]
df_example = spark.createDataFrame(arr, ['A','B','C'])
assembler = VectorAssembler(inputCols=[x for x in df_example.columns],outputCol='features')
df_vector = assembler.transform(df_example).select('features')
>>> df_vector.show()
+-------------+
| features|
+-------------+
|[1.0,2.0,3.0]|
|[4.0,5.0,6.0]|
+-------------+
I want to find the Euclidean distance between each vector and a particular cluster center(an array of same length). Assume the cluster center is:
cluster_center_0 = np.array([0.6, 0.7, 0.8])
How do I achieve this? I tried creating a SQL query hoping that I could get access to the elements inside the vector using OFFSET
and from there it would be easy to calculate the distances. But that didn't work out. This is the query I used. Unfortunately it doesn't work and I have very limited knowledge of sql
SELECT aml_cluster_inpt_features
aml_cluster_inpt_features[OFFSET(0)] AS offset_0,
aml_cluster_inpt_features[OFFSET(1)] AS offset_1,
aml_cluster_inpt_features[OFFSET(2)] AS offset_2,
aml_cluster_inpt_features[OFFSET(3)] AS offset_3,
FROM event_rate_holder
Is there a simpler way of doing this? If not, am I headed in the right direction with the sql query above?
Upvotes: 2
Views: 1127
Reputation: 35229
Just use UDF:
from pyspark.sql.functions import udf
from scipy.spatial import distance
def euclidean(v1):
@udf("double")
def _(v2):
return distance.euclidean(v1, v2) if v2 is not None else None
return _
center = np.array([0.6, 0.7, 0.8])
df_vector.withColumn("dist", euclidean(center)("features")).show()
# +-------------+-----------------+
# | features| dist|
# +-------------+-----------------+
# |[1.0,2.0,3.0]|2.586503431275513|
# |[4.0,5.0,6.0]|7.555792479945437|
# +-------------+-----------------+
If you want to disassemble vectors you can use How to split Vector into columns - using PySpark
Upvotes: 2