Clock Slave
Clock Slave

Reputation: 7967

PySpark: accessing vector elements in sql

I have spark dataframe that has a column named features that holds vectors of data. This column is the output of pyspark's StandardScaler object. I am creating a dataset here similar to the one I have.

# create sample data
arr = [[1,2,3], [4,5,6]]
df_example = spark.createDataFrame(arr, ['A','B','C'])
assembler = VectorAssembler(inputCols=[x for x in df_example.columns],outputCol='features')
df_vector = assembler.transform(df_example).select('features')


>>> df_vector.show()
+-------------+
|     features|
+-------------+
|[1.0,2.0,3.0]|
|[4.0,5.0,6.0]|
+-------------+

I want to find the Euclidean distance between each vector and a particular cluster center(an array of same length). Assume the cluster center is:

cluster_center_0 = np.array([0.6, 0.7, 0.8])

How do I achieve this? I tried creating a SQL query hoping that I could get access to the elements inside the vector using OFFSET and from there it would be easy to calculate the distances. But that didn't work out. This is the query I used. Unfortunately it doesn't work and I have very limited knowledge of sql

SELECT aml_cluster_inpt_features
aml_cluster_inpt_features[OFFSET(0)] AS offset_0,
aml_cluster_inpt_features[OFFSET(1)] AS offset_1,
aml_cluster_inpt_features[OFFSET(2)] AS offset_2,
aml_cluster_inpt_features[OFFSET(3)] AS offset_3,
FROM event_rate_holder

Is there a simpler way of doing this? If not, am I headed in the right direction with the sql query above?

Upvotes: 2

Views: 1127

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35229

Just use UDF:

from pyspark.sql.functions import udf
from scipy.spatial import distance

def euclidean(v1):
    @udf("double")
    def _(v2):
        return distance.euclidean(v1, v2) if v2 is not None else None
    return _


center = np.array([0.6, 0.7, 0.8])

df_vector.withColumn("dist", euclidean(center)("features")).show()
# +-------------+-----------------+
# |     features|             dist|
# +-------------+-----------------+
# |[1.0,2.0,3.0]|2.586503431275513|
# |[4.0,5.0,6.0]|7.555792479945437|
# +-------------+-----------------+

If you want to disassemble vectors you can use How to split Vector into columns - using PySpark

Upvotes: 2

Related Questions