Reputation: 827
I am trying to feed a Vector datatype to a mllib
function called Word2Vec
in Spark. As Word2Vec
returns a DataFrame
with the "result" column containing the desired Vector, a bit of code is needed. Now finally when the code runs successfully in Spark, I try to use .foreach
to println
a few of the lines of code. Spark crashes at this step with the following error: NullPointerException
. The code will run fine if I remove the println
command. I tried using sample method of RDD, but the same Spark error came up. Somehow the RDD has become unreadable.
To understand the background of this ML task, please refer to this link.
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.feature.{LabeledPoint => NewLabeledPoint}
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import scala.util.{Success, Try}
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
val input_labelled = labelledTweets.map(
t => (t._1, word2VecModel2.transform(t._2.toDF("text")).select("result").first().getAs[org.apache.spark.ml.linalg.Vector](0)))
.map(x => new LabeledPoint((x._1).toDouble, x._2))
input_labelled.take(3).foreach(println)
documentDF2: org.apache.spark.sql.DataFrame = [value: array<string>]
word2Vec2: org.apache.spark.ml.feature.Word2Vec = w2v_643337d9029a
word2VecModel2: org.apache.spark.ml.feature.Word2VecModel = w2v_643337d9029a
input_labelled: org.apache.spark.rdd.RDD[org.apache.spark.ml.feature.LabeledPoint] = MapPartitionsRDD[52] at map at <console>:74
input1: org.apache.spark.sql.DataFrame = [value: array<string>]
model2: org.apache.spark.ml.linalg.Vector = [9.573533798832813E-5,-1.8443804499634973E-4,3.803069862805999E-5,-4.663512611061804E-5,1.3393058071633097E-4]
[9.573533798832813E-5,-1.8443804499634973E-4,3.803069862805999E-5,-4.663512611061804E-5,1.3393058071633097E-4]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 4 times, most recent failure: Lost task 0.3 in stage 21.0 (TID 159, sandbox-hdp.hortonworks.com, executor 1): java.lang.NullPointerException
at $anonfun$1.apply(<console>:73)
at $anonfun$1.apply(<console>:73)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
... 58 elided
Caused by: java.lang.NullPointerException
at $anonfun$1.apply(<console>:73)
at $anonfun$1.apply(<console>:73)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
Upvotes: 1
Views: 177
Reputation: 1824
In all likelihood, one of your inputs has a null field. Spark evaluates lazily, so until you run take(3)
, you haven't actually done any computation, which is why you have no error without that line.
Additionally, it would be more typical (and probably faster) to convert the RDD
to a DataFrame
and then apply the transformer.
Upvotes: 1