Kareem Alhazred
Kareem Alhazred

Reputation: 11

Error thrown when using BlockMatrix.add

I'm attempting to use the distributed matrix data structure BlockMatrix (Spark 1.5.0, scala) and having some issues when attempting to add two block matrices together (error attached below).

I'm constructing the two matrices by creating a collection of MatrixEntry's, putting that into CoordinateMatrix (with specifying Nrows,Ncols), and then using the CoordinateMatrix routine toBlockMatrix(Rpb,Cpb). For both matrices the Rpb/Cpb's are the same.

Unfortunately when attempting to use the BlockMatrix.add routine I'm getting:

15/11/04 10:17:27 ERROR executor.Executor: Exception in task 0.0 in stage
11.0 (TID 30)

java.lang.IllegalArgumentException: requirement failed: The last value of
colPtrs must equal the number of elements. values.length: 9164,
colPtrs.last: 5118

at scala.Predef$.require(Predef.scala:233)

at org.apache.spark.mllib.linalg.SparseMatrix.<init>(Matrices.scala:373)

at org.apache.spark.mllib.linalg.SparseMatrix.<init>(Matrices.scala:400)

at org.apache.spark.mllib.linalg.Matrices$.fromBreeze(Matrices.scala:701)

at
org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$5.apply(BlockMatrix.scala:321)

at
org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$5.apply(BlockMatrix.scala:310)

at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)

at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:64)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

15/11/04 10:17:27 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
11.0 (TID 32, localhost, PROCESS_LOCAL, 2171 bytes)

15/11/04 10:17:27 INFO executor.Executor: Running task 2.0 in stage 11.0
(TID 32)

15/11/04 10:17:27 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 11.0
(TID 30, localhost): java.lang.IllegalArgumentException: requirement failed:
The last value of colPtrs must equal the number of elements. values.length:
9164, colPtrs.last: 5118

at scala.Predef$.require(Predef.scala:233)

at org.apache.spark.mllib.linalg.SparseMatrix.<init>(Matrices.scala:373)

at org.apache.spark.mllib.linalg.SparseMatrix.<init>(Matrices.scala:400)

at org.apache.spark.mllib.linalg.Matrices$.fromBreeze(Matrices.scala:701)

at
org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$5.apply(BlockMatrix.scala:321)

at
org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$5.apply(BlockMatrix.scala:310)

at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)

at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:64)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)



15/11/04 10:17:27 ERROR scheduler.TaskSetManager: Task 0 in stage 11.0
failed 1 times; aborting job

15/11/04 10:17:27 INFO storage.ShuffleBlockFetcherIterator: Getting 4
non-empty blocks out of 4 blocks

15/11/04 10:17:27 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote
fetches in 0 ms

15/11/04 10:17:27 INFO storage.ShuffleBlockFetcherIterator: Getting 2
non-empty blocks out of 4 blocks

15/11/04 10:17:27 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote
fetches in 1 ms

15/11/04 10:17:27 INFO scheduler.TaskSchedulerImpl: Cancelling stage 11

15/11/04 10:17:27 INFO executor.Executor: Executor is trying to kill task
1.0 in stage 11.0 (TID 31)

15/11/04 10:17:27 INFO scheduler.TaskSchedulerImpl: Stage 11 was cancelled

15/11/04 10:17:27 INFO executor.Executor: Executor is trying to kill task
2.0 in stage 11.0 (TID 32)

15/11/04 10:17:27 INFO scheduler.DAGScheduler: Stage 11 (map at
kmv.scala:26) failed in 0.114 s

15/11/04 10:17:27 INFO executor.Executor: Executor killed task 2.0 in stage
11.0 (TID 32)

15/11/04 10:17:27 INFO scheduler.DAGScheduler: Job 2 failed: reduce at
CoordinateMatrix.scala:143, took 6.046350 s

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 11.0 (TID 30, localhost):
java.lang.IllegalArgumentException: requirement failed: The last value of
colPtrs must equal the number of elements. values.length: 9164,
colPtrs.last: 5118

at scala.Predef$.require(Predef.scala:233)

at org.apache.spark.mllib.linalg.SparseMatrix.<init>(Matrices.scala:373)

at org.apache.spark.mllib.linalg.SparseMatrix.<init>(Matrices.scala:400)

at org.apache.spark.mllib.linalg.Matrices$.fromBreeze(Matrices.scala:701)

at
org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$5.apply(BlockMatrix.scala:321)

at
org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$5.apply(BlockMatrix.scala:310)

at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)

at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:64)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)



Driver stacktrace:

at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

at scala.Option.foreach(Option.scala:236)

at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

15/11/04 10:17:27 WARN scheduler.TaskSetManager: Lost task 2.0 in stage 11.0
(TID 32, localhost): TaskKilled (killed intentionally)

15/11/04 10:17:27 INFO executor.Executor: Executor killed task 1.0 in stage
11.0 (TID 31)

15/11/04 10:17:27 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 11.0
(TID 31, localhost): TaskKilled (killed intentionally)

15/11/04 10:17:27 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 11.0,
whose tasks have all completed, from pool

Edit: Below is a small reproducer (not the original) for the problem. It fails for 1.3.0 and 1.5.0 at least. Is there a dense+sparse issue I'm not understanding?

import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.{SparkConf, SparkContext}

object runAddBug {    
  def main(args: Array[String]) {
    val appName = "addbug"
    val conf = new SparkConf().setAppName(appName)
    val sc = new SparkContext(conf)

    val N = 200
    val Nbx = 10

    val diags = sc.parallelize(0 to N-1)
    val Eyecm = new CoordinateMatrix(diags.map{d =>
      MatrixEntry(d, d, 1.0)}, N, N)
    val diagsq = diags.cartesian(diags)
    val Bcm = new CoordinateMatrix(diagsq.map{dd =>
      MatrixEntry( dd._1, dd._2, 1.0*dd._1*dd._2)}, N, N)

    val Rpb = (N/Nbx).toInt
    val Cpb = (N/Nbx).toInt

    var A = Eyecm.toBlockMatrix(Rpb,Cpb)
    var B = Bcm.toBlockMatrix(Rpb,Cpb)

    var C = A.add(B)

    val entsOut = C.toCoordinateMatrix().entries
    val outLines = entsOut.map{anent =>
      anent.i.toString + "\t" + anent.j.toString + "\t" + anent.value.toString
    }
    outLines.saveAsTextFile("Outdata")
   }
}

Thanks for any help!

Upvotes: 1

Views: 368

Answers (1)

elghoto
elghoto

Reputation: 303

This is totally a bug, and it's caused in spark.mllib.linalg.Matrices.fromBreeze when it converts a result from an operation performed over two CSCSparse Breeze matrices.

Here there is some Scala code to reproduce the bug. I hope it gets patched soon (tested with Spark 2.3).

  test("breeze conversion bug") {
    // (2, 0, 0)
    // (2, 0, 0)
    val mat1Brz = Matrices.sparse(2, 3, Array(0, 2, 2, 2), Array(0, 1), Array(2, 2)).asBreeze
    // (2, 1E-15, 1E-15)
    // (2, 1E-15, 1E-15
    val mat2Brz = Matrices.sparse(2, 3, Array(0, 2, 4, 6), Array(0, 0, 0, 1, 1, 1), Array(2, 1E-15, 1E-15, 2, 1E-15, 1E-15)).asBreeze
    // The following shouldn't break
    val t01 = mat1Brz - mat1Brz
    val t02 = mat2Brz - mat2Brz
    val t02Brz = Matrices.fromBreeze(t02)
    val t01Brz = Matrices.fromBreeze(t01)

    val t1Brz = mat1Brz - mat2Brz
    val t2Brz = mat2Brz - mat1Brz
    // The following ones should break
    val t1 = Matrices.fromBreeze(t1Brz)
    val t2 = Matrices.fromBreeze(t2Brz)

  }

Upvotes: 1

Related Questions