Reputation: 7967
I have a spark dataframe using which I am calculating the Euclidean distance between a row and a given set of corrdinates. I am recreating a structurally similar dataframe 'df_vector' here to explain better.
from pyspark.ml.feature import VectorAssembler
arr = [[1,2,3], [4,5,6]]
df_example = spark.createDataFrame(arr, ['A','B','C'])
assembler = VectorAssembler(inputCols=[x for x in df_example.columns],outputCol='features')
df_vector = assembler.transform(df_example).select('features')
>>> df_vector.show()
+-------------+
| features|
+-------------+
|[1.0,2.0,3.0]|
|[4.0,5.0,6.0]|
+-------------+
>>> df_vector.dtypes
[('features', 'vector')]
As you can see the features
column is a vector. In practice, I get this vector column as the output of a StandardScaler
. Anyway, since I need to calculate Euclidean distance, I do the following
rdd = df_vector.select('features').rdd.map(lambda r: np.linalg.norm(r-b))
where
b = np.asarray([0.5,1.0,1.5])
I have all the calculations I need but I need this rdd
as a column in df_vector
. How do I go about it?
Upvotes: 2
Views: 2184
Reputation: 2696
One way to tackle performance issues might be to use mapPartitions
. The idea would be, at a partition level, to convert features
to an array and then calculate the norm on the whole array (thus implicitly using numpy vectorisation). Then do some housekeeping to get the form you want. For large datasets this might improve performance:
Here is the function which calculates the norm at partition level:
from pyspark.sql import Row
def getnorm(vectors):
# convert vectors into numpy array
vec_array=np.vstack([v['features'] for v in vectors])
# calculate the norm
norm=np.linalg.norm(vec_array-b, axis=1)
# tidy up to get norm as a column
output=[Row(features=x, norm=y) for x,y in zip(vec_array.tolist(), norm.tolist())]
return(output)
Applying this using mapPartitions
gives an RDD of Rows which can then be converted to a DataFrame:
df_vector.rdd.mapPartitions(getnorm).toDF()
Upvotes: 1
Reputation: 28322
Instead of creating a new rdd, you could use an UDF
:
norm_udf = udf(lambda r: np.linalg.norm(r - b).tolist(), FloatType())
df_vector.withColumn("norm", norm_udf(df.features))
Make sure numpy
is defined on the worker nodes.
Upvotes: 3