PTDS
PTDS

Reputation: 227

Large matrix operations: Multiplication in Scala/Apache Spark

I need to multiply two large matrices, X and Y. Typically X has ~500K rows and ~18K columns and Y has ~18K rows and ~18K columns. The matrix X is expected to be sparse and the matrix Y is expected to be sparse/dense. What is the ideal way of performing this multiplication in Scala/Apache Spark?

Upvotes: 1

Views: 7815

Answers (1)

Tesseract
Tesseract

Reputation: 8139

I got some code for you. It represents a matrix as an array of column vectors (which means each entry in the array is a column, not a row). It takes about 0.7s to multiply two 1000*1000 matrices. 11 minutes for two 10,000 * 10,000 matrices. 1.5 hours for 20,000 * 20,000 and 30 hours for (500k*18k) times (18k*18k). But if you run it in parallel (by using the code that's commented out) it should run about 2 to 3 times faster (on a 4 core cpu). But remember that the number of columns in the first matrix always has to be the same as the number of rows in the second.

class Matrix(val columnVectors: Array[Array[Double]]) {
  val columns = columnVectors.size
  val rows = columnVectors.head.size
  def *(v: Array[Double]): Array[Double] = {
    val newValues = Array.ofDim[Double](rows)
    var col = 0
    while(col < columns) {
      val n = v(col)
      val column = columnVectors(col)
      var row = 0
      while(row < newValues.size) {
        newValues(row) += column(row) * n
        row += 1
      }
      col += 1
    }
    newValues
  }
  def *(other: Matrix): Matrix = {
    //do the calculation on only one cpu
    new Matrix(other.columnVectors.map(col => this * col))

    //do the calculation in parallel on all available cpus
    //new Matrix(other.columnVectors.par.map(col => this * col).toArray)
  }
  override def toString = {
    columnVectors.transpose.map(_.mkString(", ")).mkString("\n")
  }
}

edit:

ok, here is a better version. I now store the row vectors in the matrix instead of the column vectors. That makes it easier to optimize the multiplication for the case where the first matrix is sparse. Also I added a lazy version of the matrix multiplication using iterators. Since the first matrix is 500k * 18k = 9 billion numbers, such a lazy version will allow you to do that multiplication without requiring much ram. You just have to create an Iterator that can read the rows lazily e.g. from a data bank and then write the rows from the resulting iterator back.

import scala.collection.Iterator
import scala.util.{Random => rand}

def time[T](descr: String)(f: => T): T = {
  val start = System.nanoTime
  val r = f
  val end = System.nanoTime
  val time = (end - start)/1e6
  println(descr + ": time = " + time + "ms")
  r
}

object Matrix {
  def mulLazy(m1: Iterator[Array[Double]], m2: Matrix): Iterator[Array[Double]] = {
    m1.grouped(8).map { group =>
      group.par.map(m2.mulRow).toIterator
    }.flatten
  }
}

class Matrix(val rowVectors: Array[Array[Double]]) {
  val columns = rowVectors.head.size
  val rows = rowVectors.size

  private def mulRow(otherRow: Array[Double]): Array[Double] = {
    val rowVectors = this.rowVectors
    val result = Array.ofDim[Double](columns)
    var i = 0
    while(i < otherRow.size) {
      val value = otherRow(i)
      if(value != 0) { //optimization for sparse matrix
        val row = rowVectors(i)
        var col = 0
        while(col < result.size) {
          result(col) += value * row(col)
          col += 1
        }
      }
      i += 1
    }
    result
  }

  def *(other: Matrix): Matrix = {
    new Matrix(rowVectors.par.map(other.mulRow).toArray)
  }

  def equals(other: Matrix): Boolean = {
    java.util.Arrays.deepEquals(this.rowVectors.asInstanceOf[Array[Object]], other.rowVectors.asInstanceOf[Array[Object]])
  }

  override def equals(other: Any): Boolean = {
    if(other.isInstanceOf[Matrix]) equals(other.asInstanceOf[Matrix]) else false
  }

  override def toString = {
    rowVectors.map(_.mkString(", ")).mkString("\n")
  }
}

def randMatrix(rows: Int, columns: Int): Matrix = {  
  new Matrix((1 to rows).map(_ => Array.fill(columns)(rand.nextDouble * 100)).toArray)
}

def sparseRandMatrix(rows: Int, columns: Int, ratio: Double): Matrix = {
  new Matrix((1 to rows).map(_ => Array.fill(columns)(if(rand.nextDouble > ratio) 0 else rand.nextDouble * 100)).toArray)
}

val N = 2000

val m1 = sparseRandMatrix(N, N, 0.1) // only 10% of the numbers will be different from 0
val m2 = randMatrix(N, N)

val m3 = m1.rowVectors.toIterator

val m12 = time("m1 * m2")(m1 * m2)
val m32 = time("m3 * m2")(Matrix.mulLazy(m3, m2)) //doesn't take much time because the matrix multiplication is lazy

println(m32)

println("m12 == m32 = " + (new Matrix(m32.toArray) == m12))

Upvotes: 10

Related Questions