Reputation: 31
I was trying to implement NaiveBayes on spark, yet it does't work well.
it is probably because of this line at temp function : each_prob.map(_.take(1))
if you have any idea about the problem, please help me...
this is my main function ;
object DoNaive extends App{
val file_pathes =
Vector( ("plus","resource/doc1.txt"),
("plus","resource/doc2.txt"),
("plus","resource/doc3.txt"),
("minus","resource/doc4.txt"),
("minus","resource/doc5.txt"),
("minus","resource/doc6.txt")
)
val pn = ParallelNaive(file_pathes)
val cached_rdd = read.rdds("resource/examine.txt")
val each_prob : Vector[RDD[String]] =
pn.allClassNames.map{
class_name =>
cached_rdd
.map { elt => ( pn.eachProbWord(elt._1 , class_name ) * elt._2 ).toString }
}
val head_prob = each_prob.head
println(pn.docs.map(elt=>elt._2.take(1).head))
pn.temp("resource/examine.txt")}
and this is the ParallelNaive class. temp function is just to find the problem :
case class ParallelNaive( file_pathes : Vector[(String,String)] ) extends Serializable {
val docs:Vector[(String ,RDD[(String,Int)])] = file_pathes.map( class_path => ( class_path._1 , read.rdds(class_path._2,false) ) )
val wholeClass :Map[String,Vector[(String,RDD[(String,Int)])]] = docs.groupBy(elt=>elt._1)
val allClassNames:Vector[String] = wholeClass.map(elt=>elt._1).toVector
val eachNumDocs:Map[String,Int] = wholeClass.map(elt=>(elt._1,elt._2.length))
val sumNumDocs:Int = docs.size
def eachNumWord(word:String , class_name:String ):Int = {
var doc_count = 0
wholeClass(class_name).foreach{
class_rdd => // == (class,rdd)
val filtered = class_rdd._2.filter{word_occur=> word_occur._1==word}.take(1)
if(filtered.size!=0) doc_count += 1
}
doc_count
}
def eachProbWord(word:String , class_name:String , alpha:Int = 2):Double={
val Nwc = eachNumWord(word , class_name).toDouble
val Nc = eachNumDocs(class_name).toDouble
log( ( Nwc+(alpha-1) ) / ( Nc + 2*(alpha-1) ) )
}
def eachProbClass(class_name:String):Double={
val Nc = eachNumDocs(class_name).toDouble
log( ( Nc+1 ) / ( sumNumDocs + NumClass ) )
}
val NumClass = wholeClass.size
def temp(doc_path:String) ={
val cached_rdd = read.rdds(doc_path)
val each_prob : Vector[RDD[Double]] =
allClassNames.map{
class_name =>
cached_rdd
.map { elt => eachProbWord(elt._1 , class_name ) * elt._2 }
}
each_prob.map(_.take(1))
}
def classify(doc_path:String , alpha:Int = 2 ) = {
val cached_rdd = read.rdds(doc_path) //何度も使うのでcache化
val ProbPerClass =
allClassNames.map{
class_name =>
val each_prob =
cached_rdd.map { elt => eachProbWord(elt._1 , class_name , alpha) * elt._2 }
val sum_prob : Double = each_prob.reduce{ (a,b) => a+b }
sum_prob + eachProbClass(class_name)
}
//list of probability that this document would belong to
println(" max_class---------------------------------------------")
println("ProbPerClass : "+ProbPerClass)
val max_class : (Double,Int) = ProbPerClass.zipWithIndex.max
// ( probability , index of the class )
println(" return estimation class---------------------------------------------")
allClassNames(max_class._2)
}
}
got error like this ;
14/04/06 13:55:50 INFO scheduler.TaskSchedulerImpl: Adding task set 16.0 with 1 tasks
14/04/06 13:55:50 INFO scheduler.TaskSetManager: Starting task 16.0:0 as TID 15 on executor localhost: localhost (PROCESS_LOCAL)
14/04/06 13:55:50 INFO scheduler.TaskSetManager: Serialized task 16.0:0 as 2941 bytes in 0 ms
14/04/06 13:55:50 INFO executor.Executor: Running task ID 15
14/04/06 13:55:50 INFO storage.BlockManager: Found block broadcast_7 locally
14/04/06 13:55:50 INFO storage.BlockManager: Found block broadcast_0 locally
14/04/06 13:55:50 INFO storage.BlockManager: Found block broadcast_1 locally
14/04/06 13:55:50 INFO storage.BlockManager: Found block broadcast_2 locally
14/04/06 13:55:50 INFO storage.BlockManager: Found block broadcast_3 locally
14/04/06 13:55:50 INFO storage.BlockManager: Found block broadcast_4 locally
14/04/06 13:55:50 INFO storage.BlockManager: Found block broadcast_5 locally
14/04/06 13:55:50 INFO storage.BlockManager: Found block rdd_57_0 locally
eachProbWord---------------------------------------------
eachNumWord---------------------------------------------
14/04/06 13:55:50 ERROR executor.Executor: Exception in task ID 15
java.lang.NullPointerException
at org.apache.spark.rdd.RDD.filter(RDD.scala:261)
at supervised.ParallelNaive$$anonfun$eachNumWord$1.apply(ParallelNaive.scala:38)
at supervised.ParallelNaive$$anonfun$eachNumWord$1.apply(ParallelNaive.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at supervised.ParallelNaive.eachNumWord(ParallelNaive.scala:36)
at supervised.ParallelNaive.eachProbWord(ParallelNaive.scala:50)
at supervised.ParallelNaive$$anonfun$7$$anonfun$apply$1.apply(ParallelNaive.scala:84)
at supervised.ParallelNaive$$anonfun$7$$anonfun$apply$1.apply(ParallelNaive.scala:84)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:844)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:844)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
14/04/06 13:55:50 WARN scheduler.TaskSetManager: Lost TID 15 (task 16.0:0)
14/04/06 13:55:50 WARN scheduler.TaskSetManager: Loss was due to java.lang.NullPointerException
java.lang.NullPointerException
at org.apache.spark.rdd.RDD.filter(RDD.scala:261)
at supervised.ParallelNaive$$anonfun$eachNumWord$1.apply(ParallelNaive.scala:38)
at supervised.ParallelNaive$$anonfun$eachNumWord$1.apply(ParallelNaive.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at supervised.ParallelNaive.eachNumWord(ParallelNaive.scala:36)
at supervised.ParallelNaive.eachProbWord(ParallelNaive.scala:50)
at supervised.ParallelNaive$$anonfun$7$$anonfun$apply$1.apply(ParallelNaive.scala:84)
at supervised.ParallelNaive$$anonfun$7$$anonfun$apply$1.apply(ParallelNaive.scala:84)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:844)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:844)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
14/04/06 13:55:50 ERROR scheduler.TaskSetManager: Task 16.0:0 failed 1 times; aborting job
14/04/06 13:55:50 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 16.0 from pool
14/04/06 13:55:50 INFO scheduler.DAGScheduler: Failed to run take at ParallelNaive.scala:89
[error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task 16.0:0 failed 1 times (most recent failure: Exception failure: java.lang.NullPointerException)
org.apache.spark.SparkException: Job aborted: Task 16.0:0 failed 1 times (most recent failure: Exception failure: java.lang.NullPointerException)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[trace] Stack trace suppressed: run last compile:run for the full output.
14/04/06 13:55:50 INFO network.ConnectionManager: Selector thread was interrupted!
java.lang.RuntimeException: Nonzero exit code: 1
at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last compile:run for the full output.
[error] (compile:run) Nonzero exit code: 1
[error] Total time: 47 s, completed 2014/04/06 13:55:50
Upvotes: 3
Views: 2655
Reputation: 31523
Well, it's a bit difficult to understand what's going on because the code is badly formatted and a huge amount of code for what can be done in just a few lines. But anyway I think we can narrow down the problem to this line of code:
val filtered = class_rdd._2.filter{word_occur=> word_occur._1==word}.take(1)
which would be much more readable if written:
val filtered = classRdd._2.filter(_._1 == word).take(1)
Anyway, your getting a NPE, therefore your RDD class_rdd
contains a null tuple. This is not a problem with Spark, but a problem with your code.
Upvotes: 1