user299791
user299791

Reputation: 2070

dataframe filter gives NullPointerException

In Spark 1.6.0 I have a data frame with a column that holds a job description, like:

Description
bartender
bartender
employee
taxi-driver
...

I retrieve a list of unique values from that column with:

val jobs = people.select("Description").distinct().rdd.map(r => r(0).asInstanceOf[String]).repartition(4)

I then try, for each job description, to retrieve people with that job and do something, but I get a NullPointerException:

jobs.foreach { 
  ajob => 
   var peoplewithjob = people.filter($"Description" === ajob)
   // ... do stuff
}

I don't understand why this happens, because every job has been extracted from the people data frame, so there should be at least one with that job... any hint more that welcome! Here's the stack trace:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 4.0 failed 1 times, most recent failure: Lost task 3.0 in stage 4.0 (TID 206, localhost): java.lang.NullPointerException
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:131)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165)
at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:799)
at jago.Run$$anonfun$main$1.apply(Run.scala:89)
at jago.Run$$anonfun$main$1.apply(Run.scala:82)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Upvotes: 5

Views: 4555

Answers (1)

zero323
zero323

Reputation: 330453

It happens because Spark doesn't support nested actions or transformations. If you want to operate on distinct values extracted from the DataFrame you have to fetch the results to the driver and iterate locally:

// or toLocalIterator
jobs.collect.foreach { 
  ajob => 
    var peoplewithjob = people.filter($"Description" === ajob)
}

Depending on what kind of transformations you apply as "do stuff" it can be a better idea to simply grouBy and aggregate:

people.groupBy($"Description").agg(...)    

Upvotes: 7

Related Questions