Reputation: 11
I'm attempting to use the distributed matrix data structure BlockMatrix (Spark 1.5.0, scala) and having some issues when attempting to add two block matrices together (error attached below).
I'm constructing the two matrices by creating a collection of MatrixEntry's, putting that into CoordinateMatrix (with specifying Nrows,Ncols), and then using the CoordinateMatrix routine toBlockMatrix(Rpb,Cpb). For both matrices the Rpb/Cpb's are the same.
Unfortunately when attempting to use the BlockMatrix.add routine I'm getting:
15/11/04 10:17:27 ERROR executor.Executor: Exception in task 0.0 in stage
11.0 (TID 30)
java.lang.IllegalArgumentException: requirement failed: The last value of
colPtrs must equal the number of elements. values.length: 9164,
colPtrs.last: 5118
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.mllib.linalg.SparseMatrix.<init>(Matrices.scala:373)
at org.apache.spark.mllib.linalg.SparseMatrix.<init>(Matrices.scala:400)
at org.apache.spark.mllib.linalg.Matrices$.fromBreeze(Matrices.scala:701)
at
org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$5.apply(BlockMatrix.scala:321)
at
org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$5.apply(BlockMatrix.scala:310)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/11/04 10:17:27 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
11.0 (TID 32, localhost, PROCESS_LOCAL, 2171 bytes)
15/11/04 10:17:27 INFO executor.Executor: Running task 2.0 in stage 11.0
(TID 32)
15/11/04 10:17:27 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 11.0
(TID 30, localhost): java.lang.IllegalArgumentException: requirement failed:
The last value of colPtrs must equal the number of elements. values.length:
9164, colPtrs.last: 5118
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.mllib.linalg.SparseMatrix.<init>(Matrices.scala:373)
at org.apache.spark.mllib.linalg.SparseMatrix.<init>(Matrices.scala:400)
at org.apache.spark.mllib.linalg.Matrices$.fromBreeze(Matrices.scala:701)
at
org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$5.apply(BlockMatrix.scala:321)
at
org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$5.apply(BlockMatrix.scala:310)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/11/04 10:17:27 ERROR scheduler.TaskSetManager: Task 0 in stage 11.0
failed 1 times; aborting job
15/11/04 10:17:27 INFO storage.ShuffleBlockFetcherIterator: Getting 4
non-empty blocks out of 4 blocks
15/11/04 10:17:27 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote
fetches in 0 ms
15/11/04 10:17:27 INFO storage.ShuffleBlockFetcherIterator: Getting 2
non-empty blocks out of 4 blocks
15/11/04 10:17:27 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote
fetches in 1 ms
15/11/04 10:17:27 INFO scheduler.TaskSchedulerImpl: Cancelling stage 11
15/11/04 10:17:27 INFO executor.Executor: Executor is trying to kill task
1.0 in stage 11.0 (TID 31)
15/11/04 10:17:27 INFO scheduler.TaskSchedulerImpl: Stage 11 was cancelled
15/11/04 10:17:27 INFO executor.Executor: Executor is trying to kill task
2.0 in stage 11.0 (TID 32)
15/11/04 10:17:27 INFO scheduler.DAGScheduler: Stage 11 (map at
kmv.scala:26) failed in 0.114 s
15/11/04 10:17:27 INFO executor.Executor: Executor killed task 2.0 in stage
11.0 (TID 32)
15/11/04 10:17:27 INFO scheduler.DAGScheduler: Job 2 failed: reduce at
CoordinateMatrix.scala:143, took 6.046350 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 11.0 (TID 30, localhost):
java.lang.IllegalArgumentException: requirement failed: The last value of
colPtrs must equal the number of elements. values.length: 9164,
colPtrs.last: 5118
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.mllib.linalg.SparseMatrix.<init>(Matrices.scala:373)
at org.apache.spark.mllib.linalg.SparseMatrix.<init>(Matrices.scala:400)
at org.apache.spark.mllib.linalg.Matrices$.fromBreeze(Matrices.scala:701)
at
org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$5.apply(BlockMatrix.scala:321)
at
org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$5.apply(BlockMatrix.scala:310)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/11/04 10:17:27 WARN scheduler.TaskSetManager: Lost task 2.0 in stage 11.0
(TID 32, localhost): TaskKilled (killed intentionally)
15/11/04 10:17:27 INFO executor.Executor: Executor killed task 1.0 in stage
11.0 (TID 31)
15/11/04 10:17:27 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 11.0
(TID 31, localhost): TaskKilled (killed intentionally)
15/11/04 10:17:27 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 11.0,
whose tasks have all completed, from pool
Edit: Below is a small reproducer (not the original) for the problem. It fails for 1.3.0 and 1.5.0 at least. Is there a dense+sparse issue I'm not understanding?
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.{SparkConf, SparkContext}
object runAddBug {
def main(args: Array[String]) {
val appName = "addbug"
val conf = new SparkConf().setAppName(appName)
val sc = new SparkContext(conf)
val N = 200
val Nbx = 10
val diags = sc.parallelize(0 to N-1)
val Eyecm = new CoordinateMatrix(diags.map{d =>
MatrixEntry(d, d, 1.0)}, N, N)
val diagsq = diags.cartesian(diags)
val Bcm = new CoordinateMatrix(diagsq.map{dd =>
MatrixEntry( dd._1, dd._2, 1.0*dd._1*dd._2)}, N, N)
val Rpb = (N/Nbx).toInt
val Cpb = (N/Nbx).toInt
var A = Eyecm.toBlockMatrix(Rpb,Cpb)
var B = Bcm.toBlockMatrix(Rpb,Cpb)
var C = A.add(B)
val entsOut = C.toCoordinateMatrix().entries
val outLines = entsOut.map{anent =>
anent.i.toString + "\t" + anent.j.toString + "\t" + anent.value.toString
}
outLines.saveAsTextFile("Outdata")
}
}
Thanks for any help!
Upvotes: 1
Views: 368
Reputation: 303
This is totally a bug, and it's caused in spark.mllib.linalg.Matrices.fromBreeze when it converts a result from an operation performed over two CSCSparse Breeze matrices.
Here there is some Scala code to reproduce the bug. I hope it gets patched soon (tested with Spark 2.3).
test("breeze conversion bug") {
// (2, 0, 0)
// (2, 0, 0)
val mat1Brz = Matrices.sparse(2, 3, Array(0, 2, 2, 2), Array(0, 1), Array(2, 2)).asBreeze
// (2, 1E-15, 1E-15)
// (2, 1E-15, 1E-15
val mat2Brz = Matrices.sparse(2, 3, Array(0, 2, 4, 6), Array(0, 0, 0, 1, 1, 1), Array(2, 1E-15, 1E-15, 2, 1E-15, 1E-15)).asBreeze
// The following shouldn't break
val t01 = mat1Brz - mat1Brz
val t02 = mat2Brz - mat2Brz
val t02Brz = Matrices.fromBreeze(t02)
val t01Brz = Matrices.fromBreeze(t01)
val t1Brz = mat1Brz - mat2Brz
val t2Brz = mat2Brz - mat1Brz
// The following ones should break
val t1 = Matrices.fromBreeze(t1Brz)
val t2 = Matrices.fromBreeze(t2Brz)
}
Upvotes: 1