vortex
vortex

Reputation: 99

PySpark RDD Sparse Matrix multiplication from scala to python

I previously posted a question on coordinate matrix multiplication with 9 million rows and 85K columns. Errors for block matrix multiplification in Spark

However, I ran into Out of Memory issue on DataProc. I have tried to configure the cluster with high memory cores, but with no luck.

I am reading this article and thought it may help in my case: https://www.balabit.com/blog/scalable-sparse-matrix-multiplication-in-apache-spark/ However, the solution they provided is in Scala, which I am not familiar with. Can someone kind enough to translate this code to python? Thanks a bunch!

def coordinateMatrixMultiply(leftMatrix: CoordinateMatrix, rightMatrix: CoordinateMatrix): 
    CoordinateMatrix = {
    val M_ = leftMatrix.entries.map({ case MatrixEntry(i, j, v) => (j, (i, v)) })
    val N_ = rightMatrix.entries.map({ case MatrixEntry(j, k, w) => (j, (k, w)) })

    val productEntries = M_
    .join(N_)
    .map({ case (_, ((i, v), (k, w))) => ((i, k), (v * w)) })
    .reduceByKey(_ + _)
    .map({ case ((i, k), sum) => MatrixEntry(i, k, sum) })

    new CoordinateMatrix(productEntries)
}

Upvotes: 0

Views: 1345

Answers (2)

Stefan_EOX
Stefan_EOX

Reputation: 1531

Implementation for Python 3.x

  1. Since in Python 3 there is no tuple unpacking in lambda functions, we have to reference the MatrixEntry by a single variable e.
  2. Also, MatrixEntry is not indexable so we must call the individual properties i, j and value.
def coordinateMatrixMultiply(leftmatrix, rightmatrix):
    left  =  leftmatrix.entries.map(lambda e: (e.j, (e.i, e.value)))
    right = rightmatrix.entries.map(lambda e: (e.i, (e.j, e.value)))
    productEntries = left \
        .join(right) \
        .map(lambda e: ((e[1][0][0], e[1][1][0]), (e[1][0][1]*e[1][1][1]))) \
        .reduceByKey(lambda x,y: x+y) \
        .map(lambda e: (*e[0], e[1]))
    return productEntries

Upvotes: 1

vortex
vortex

Reputation: 99

Fast as well, just need to convert to rdd before plug in.

 def coordinateMatrixMultiply(leftmatrix, rightmatrix):
        left = leftmatrix.map(lambda (i, j, v): (j, (i, v)))
        right = rightmatrix.map(lambda (j, k, w): (j, (k, w)))
        productEntries = left   \
                        .join(right)    \
                        .map(lambda (x, ((i, v), (k, w))): ((i, k), (v * w)))   \
                        .reduceByKey(lambda x,y: x+y)   \
                        .map(lambda ((i, k), sum): (i, k, sum))
        return productEntries

Upvotes: 0

Related Questions