Adeel Hashmi
Adeel Hashmi

Reputation: 827

Spark Machine Learning: RDD becomes unreadable

I am trying to feed a Vector datatype to a mllib function called Word2Vec in Spark. As Word2Vec returns a DataFrame with the "result" column containing the desired Vector, a bit of code is needed. Now finally when the code runs successfully in Spark, I try to use .foreach to println a few of the lines of code. Spark crashes at this step with the following error: NullPointerException. The code will run fine if I remove the println command. I tried using sample method of RDD, but the same Spark error came up. Somehow the RDD has become unreadable.

To understand the background of this ML task, please refer to this link.

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.feature.{LabeledPoint => NewLabeledPoint}
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import scala.util.{Success, Try}

import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

val input_labelled = labelledTweets.map(
      t => (t._1, word2VecModel2.transform(t._2.toDF("text")).select("result").first().getAs[org.apache.spark.ml.linalg.Vector](0)))
      .map(x => new LabeledPoint((x._1).toDouble, x._2)) 

input_labelled.take(3).foreach(println)

documentDF2: org.apache.spark.sql.DataFrame = [value: array<string>]
word2Vec2: org.apache.spark.ml.feature.Word2Vec = w2v_643337d9029a
word2VecModel2: org.apache.spark.ml.feature.Word2VecModel = w2v_643337d9029a
input_labelled: org.apache.spark.rdd.RDD[org.apache.spark.ml.feature.LabeledPoint] = MapPartitionsRDD[52] at map at <console>:74
input1: org.apache.spark.sql.DataFrame = [value: array<string>]
model2: org.apache.spark.ml.linalg.Vector = [9.573533798832813E-5,-1.8443804499634973E-4,3.803069862805999E-5,-4.663512611061804E-5,1.3393058071633097E-4]
[9.573533798832813E-5,-1.8443804499634973E-4,3.803069862805999E-5,-4.663512611061804E-5,1.3393058071633097E-4]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 4 times, most recent failure: Lost task 0.3 in stage 21.0 (TID 159, sandbox-hdp.hortonworks.com, executor 1): java.lang.NullPointerException
    at $anonfun$1.apply(<console>:73)
    at $anonfun$1.apply(<console>:73)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
  at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
  at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
  ... 58 elided
Caused by: java.lang.NullPointerException
  at $anonfun$1.apply(<console>:73)
  at $anonfun$1.apply(<console>:73)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

Upvotes: 1

Views: 177

Answers (1)

hoyland
hoyland

Reputation: 1824

In all likelihood, one of your inputs has a null field. Spark evaluates lazily, so until you run take(3), you haven't actually done any computation, which is why you have no error without that line.

Additionally, it would be more typical (and probably faster) to convert the RDD to a DataFrame and then apply the transformer.

Upvotes: 1

Related Questions