Clock Slave
Clock Slave

Reputation: 7967

PySpark: Convert RDD to column in dataframe

I have a spark dataframe using which I am calculating the Euclidean distance between a row and a given set of corrdinates. I am recreating a structurally similar dataframe 'df_vector' here to explain better.

from pyspark.ml.feature import VectorAssembler
arr = [[1,2,3], [4,5,6]]
df_example = spark.createDataFrame(arr, ['A','B','C'])
assembler = VectorAssembler(inputCols=[x for x in df_example.columns],outputCol='features')
df_vector = assembler.transform(df_example).select('features') 

>>> df_vector.show()
+-------------+
|     features|
+-------------+
|[1.0,2.0,3.0]|
|[4.0,5.0,6.0]|
+-------------+

>>> df_vector.dtypes
[('features', 'vector')]

As you can see the features column is a vector. In practice, I get this vector column as the output of a StandardScaler. Anyway, since I need to calculate Euclidean distance, I do the following

rdd = df_vector.select('features').rdd.map(lambda r: np.linalg.norm(r-b))

where

b = np.asarray([0.5,1.0,1.5])

I have all the calculations I need but I need this rdd as a column in df_vector. How do I go about it?

Upvotes: 2

Views: 2184

Answers (2)

ags29
ags29

Reputation: 2696

One way to tackle performance issues might be to use mapPartitions. The idea would be, at a partition level, to convert features to an array and then calculate the norm on the whole array (thus implicitly using numpy vectorisation). Then do some housekeeping to get the form you want. For large datasets this might improve performance:

Here is the function which calculates the norm at partition level:

from pyspark.sql import Row
def getnorm(vectors):
    # convert vectors into numpy array
    vec_array=np.vstack([v['features'] for v in vectors])
    # calculate the norm
    norm=np.linalg.norm(vec_array-b, axis=1)
    # tidy up to get norm as a column
    output=[Row(features=x, norm=y) for x,y in zip(vec_array.tolist(), norm.tolist())]
    return(output)

Applying this using mapPartitions gives an RDD of Rows which can then be converted to a DataFrame:

df_vector.rdd.mapPartitions(getnorm).toDF()

Upvotes: 1

Shaido
Shaido

Reputation: 28322

Instead of creating a new rdd, you could use an UDF:

norm_udf = udf(lambda r: np.linalg.norm(r - b).tolist(), FloatType())
df_vector.withColumn("norm", norm_udf(df.features))

Make sure numpy is defined on the worker nodes.

Upvotes: 3

Related Questions