F Gh
F Gh

Reputation: 75

Dot Products of Rows of a Dataframe with a Fixed Vector in Spark

I have a dataframe (df1) with m rows and n columns in Spark. I have another dataframe (df2) with 1 row and n columns. How can I efficiently compute the dot product of each row of df1 with the single row of df2?

Upvotes: 0

Views: 2094

Answers (3)

David
David

Reputation: 213

Since Python udfs are not very performant, its probably better to implement it in Scala. But if you want a pure Python implementation, you can try the following hack.

A dot product is equivalent to a linear prediction. So you can leverage LinearRegressionModel.transform if you create one with the desired coefficients. You can do that by "estimating" a regression with the features as an identity matrix and the labels as your desired coefficients. See the following implementation:

from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.sql import Row, DataFrame

class DotProduct:
    _regressors_col = 'regressors'
    _dot_product_col = 'dot_product'
    _coef_col = 'coef'
    _index_col = 'index'

    def __init__(
        self,
        coefficients: np.ndarray
    ):
        self._coefficients = coefficients

    def transform(self, dataframe: DataFrame) -> DataFrame:
        coef_df = self._convert_coefficients_to_dataframe()
        w_identity_df = self._add_identity_matrix_as_regressors(coef_df)

        linear_reg = LinearRegression(
            featuresCol=self._regressors_col,
            labelCol=self._coef_col,
            predictionCol=self._dot_product_col,
            fitIntercept=False,
            standardization=False
        )
        model = linear_reg.fit(w_identity_df)

        return model.transform(dataset=dataframe)

    def _convert_coefficients_to_dataframe(self) -> DataFrame:
        rows = [
            Row(**{self._index_col: index, self._coef_col: float(coef)})
            for index, coef
            in enumerate(self._coefficients)
        ]
        return spark_session.createDataFrame(rows)

    def _add_identity_matrix_as_regressors(
            self,
            coefficients_df: DataFrame
    ) -> DataFrame:
        encoder = OneHotEncoder(
            inputCols=[self._index_col],
            outputCols=[self._regressors_col],
            dropLast=False
        )
        model = encoder.fit(coefficients_df)
        return model.transform(coefficients_df)

Upvotes: 0

sajjad
sajjad

Reputation: 379

You can simply form a matrix with the first data frame and another matrix with the second data frame and multiply them. Here is a code snippet to use (here I'm using block matrix since I assume your data frame can not be stored in your local machine)


v = [('a', [1,2,3]),
    ('b', [4,5,6]),
    ('c', [9,8,7])]
df1 = spark.createDataFrame(v, ['id', 'vec'])
df2 = spark.createDataFrame([('d',[3,2,1])], ['id', 'vec'])

m1 = matdf1.toBlockMatrix(100,100)
m2 = matdf2.toBlockMatrix(100,100)

m1.multiply(m2.transpose())

Upvotes: 0

niuer
niuer

Reputation: 1669

We can use VectorAssembler to do dot product.

  1. Create sample DataFrames:
from pyspark.ml.linalg import Vectors, DenseVector
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *

v = [('a', 1,2,3),
    ('b', 4,5,6),
    ('c', 9,8,7)]
df1 = spark.createDataFrame(v, ['id', 'v1', 'v2', 'v3'])
df2 = spark.createDataFrame([('d',3,2,1)], ['id', 'v1', 'v2', 'v3'])
df1.show()
df2.show()

They look like this:

+---+---+---+---+
| id| v1| v2| v3|
+---+---+---+---+
|  a|  1|  2|  3|
|  b|  4|  5|  6|
|  c|  9|  8|  7|
+---+---+---+---+

+---+---+---+---+
| id| v1| v2| v3|
+---+---+---+---+
|  d|  3|  2|  1|
+---+---+---+---+

  1. Use VectorAssembler to convert the columns to Vector
vecAssembler = VectorAssembler(inputCols=["v1", "v2", "v3"], outputCol="values")
dfv1 = vecAssembler.transform(df1) 
dfv2 = vecAssembler.transform(df2)
dfv1.show()
dfv2.show()

Now they look like this:

+---+---+---+---+-------------+
| id| v1| v2| v3|       values|
+---+---+---+---+-------------+
|  a|  1|  2|  3|[1.0,2.0,3.0]|
|  b|  4|  5|  6|[4.0,5.0,6.0]|
|  c|  9|  8|  7|[9.0,8.0,7.0]|
+---+---+---+---+-------------+

+---+---+---+---+-------------+
| id| v1| v2| v3|       values|
+---+---+---+---+-------------+
|  d|  3|  2|  1|[3.0,2.0,1.0]|
+---+---+---+---+-------------+

  1. Define a udf to do the dot product
# Get the fixed vector from DataFrame dfv2
vm = Vectors.dense(dfv2.take(1)[0]['values'])

dot_prod_udf = F.udf(lambda v: float(v.dot(vm)), FloatType())
dfv1 = dfv1.withColumn('dot_prod', dot_prod_udf('values'))

dfv1.show()

The final result is:

+---+---+---+---+-------------+--------+
| id| v1| v2| v3|       values|dot_prod|
+---+---+---+---+-------------+--------+
|  a|  1|  2|  3|[1.0,2.0,3.0]|    10.0|
|  b|  4|  5|  6|[4.0,5.0,6.0]|    28.0|
|  c|  9|  8|  7|[9.0,8.0,7.0]|    50.0|
+---+---+---+---+-------------+--------+

Upvotes: 3

Related Questions