Reputation: 75
I have a dataframe (df1) with m rows and n columns in Spark. I have another dataframe (df2) with 1 row and n columns. How can I efficiently compute the dot product of each row of df1 with the single row of df2?
Upvotes: 0
Views: 2094
Reputation: 213
Since Python udfs are not very performant, its probably better to implement it in Scala. But if you want a pure Python implementation, you can try the following hack.
A dot product is equivalent to a linear prediction. So you can leverage LinearRegressionModel.transform
if you create one with the desired coefficients. You can do that by "estimating" a regression with the features as an identity matrix and the labels as your desired coefficients. See the following implementation:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.sql import Row, DataFrame
class DotProduct:
_regressors_col = 'regressors'
_dot_product_col = 'dot_product'
_coef_col = 'coef'
_index_col = 'index'
def __init__(
self,
coefficients: np.ndarray
):
self._coefficients = coefficients
def transform(self, dataframe: DataFrame) -> DataFrame:
coef_df = self._convert_coefficients_to_dataframe()
w_identity_df = self._add_identity_matrix_as_regressors(coef_df)
linear_reg = LinearRegression(
featuresCol=self._regressors_col,
labelCol=self._coef_col,
predictionCol=self._dot_product_col,
fitIntercept=False,
standardization=False
)
model = linear_reg.fit(w_identity_df)
return model.transform(dataset=dataframe)
def _convert_coefficients_to_dataframe(self) -> DataFrame:
rows = [
Row(**{self._index_col: index, self._coef_col: float(coef)})
for index, coef
in enumerate(self._coefficients)
]
return spark_session.createDataFrame(rows)
def _add_identity_matrix_as_regressors(
self,
coefficients_df: DataFrame
) -> DataFrame:
encoder = OneHotEncoder(
inputCols=[self._index_col],
outputCols=[self._regressors_col],
dropLast=False
)
model = encoder.fit(coefficients_df)
return model.transform(coefficients_df)
Upvotes: 0
Reputation: 379
You can simply form a matrix with the first data frame and another matrix with the second data frame and multiply them. Here is a code snippet to use (here I'm using block matrix since I assume your data frame can not be stored in your local machine)
v = [('a', [1,2,3]),
('b', [4,5,6]),
('c', [9,8,7])]
df1 = spark.createDataFrame(v, ['id', 'vec'])
df2 = spark.createDataFrame([('d',[3,2,1])], ['id', 'vec'])
m1 = matdf1.toBlockMatrix(100,100)
m2 = matdf2.toBlockMatrix(100,100)
m1.multiply(m2.transpose())
Upvotes: 0
Reputation: 1669
We can use VectorAssembler to do dot product.
from pyspark.ml.linalg import Vectors, DenseVector
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
v = [('a', 1,2,3),
('b', 4,5,6),
('c', 9,8,7)]
df1 = spark.createDataFrame(v, ['id', 'v1', 'v2', 'v3'])
df2 = spark.createDataFrame([('d',3,2,1)], ['id', 'v1', 'v2', 'v3'])
df1.show()
df2.show()
They look like this:
+---+---+---+---+
| id| v1| v2| v3|
+---+---+---+---+
| a| 1| 2| 3|
| b| 4| 5| 6|
| c| 9| 8| 7|
+---+---+---+---+
+---+---+---+---+
| id| v1| v2| v3|
+---+---+---+---+
| d| 3| 2| 1|
+---+---+---+---+
VectorAssembler
to convert the columns to Vector
vecAssembler = VectorAssembler(inputCols=["v1", "v2", "v3"], outputCol="values")
dfv1 = vecAssembler.transform(df1)
dfv2 = vecAssembler.transform(df2)
dfv1.show()
dfv2.show()
Now they look like this:
+---+---+---+---+-------------+
| id| v1| v2| v3| values|
+---+---+---+---+-------------+
| a| 1| 2| 3|[1.0,2.0,3.0]|
| b| 4| 5| 6|[4.0,5.0,6.0]|
| c| 9| 8| 7|[9.0,8.0,7.0]|
+---+---+---+---+-------------+
+---+---+---+---+-------------+
| id| v1| v2| v3| values|
+---+---+---+---+-------------+
| d| 3| 2| 1|[3.0,2.0,1.0]|
+---+---+---+---+-------------+
udf
to do the dot product# Get the fixed vector from DataFrame dfv2
vm = Vectors.dense(dfv2.take(1)[0]['values'])
dot_prod_udf = F.udf(lambda v: float(v.dot(vm)), FloatType())
dfv1 = dfv1.withColumn('dot_prod', dot_prod_udf('values'))
dfv1.show()
The final result is:
+---+---+---+---+-------------+--------+
| id| v1| v2| v3| values|dot_prod|
+---+---+---+---+-------------+--------+
| a| 1| 2| 3|[1.0,2.0,3.0]| 10.0|
| b| 4| 5| 6|[4.0,5.0,6.0]| 28.0|
| c| 9| 8| 7|[9.0,8.0,7.0]| 50.0|
+---+---+---+---+-------------+--------+
Upvotes: 3