faustineinsun
faustineinsun

Reputation: 451

NullPointerException: creating dataset/dataframe inside foreachPartition/foreach

1) If I use the following one in both local and cluster mode, I get NullPointerException error

import sparkSession.implicits._
val testDS = sparkSession.createDataFrame(
  Seq(
    ABC("1","2", 1),
    ABC("3","9", 3),
    ABC("8","2", 2),
    ABC("1","2", 3),
    ABC("3","9", 1),
    ABC("2","7", 1),
    ABC("1","3", 2))
).as[ABC]

val t = testDS
  .rdd
  .groupBy(_.c)
  .foreachPartition(
    p => p.foreach(
      a => {
        val id = a._1
        println("inside foreach, id: " + id)
        val itABC = a._2

        val itSeq = itABC.toSeq
        println(itSeq.size)

        val itDS = itSeq.toDS // Get "Caused by: java.lang.NullPointerException" here
        itDS.show()

        funcA(itDS, id)
      }
    )
  )
println(t.toString)

Or

import sparkSession.implicits._
val testDS = sparkSession.createDataFrame(
  Seq(
    ABC("1","2", 1),
    ABC("3","9", 3),
    ABC("8","2", 2),
    ABC("1","2", 3),
    ABC("3","9", 1),
    ABC("2","7", 1),
    ABC("1","3", 2))
).as[ABC]

testDS
  .rdd
  .groupBy(_.c)
  .foreachPartition(
    p => p.foreach(
      a => {
        val id = a._1
        println("inside foreach, id: " + id)
        val itABC = a._2

        import sparkSession.implicits._
        val itDS = sparkSession.createDataFrame( 
          sparkSession.sparkContext.parallelize(itABC.toList, numSlices=200)) // get "NullPointerException" here
        itDS.show()

        funcA(itDS, id)
      }
    )
  )

Here's the output log for 1):

    17/10/26 15:07:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 0:>                                                          (0 + 4) / 4]17/10/26 15:07:29 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 8, 10.142.17.137, executor 0): java.lang.NullPointerException
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1$$anonfun$apply$1.apply(SL.scala:176)
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1$$anonfun$apply$1.apply(SL.scala:167)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1.apply(SL.scala:166)
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1.apply(SL.scala:166)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/10/26 15:07:29 ERROR TaskSetManager: Task 0 in stage 2.0 failed 4 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 12, 10.142.17.137, executor 0): java.lang.NullPointerException
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1$$anonfun$apply$1.apply(SL.scala:176)
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1$$anonfun$apply$1.apply(SL.scala:167)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1.apply(SL.scala:166)
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1.apply(SL.scala:166)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
    at com.a.data_pipeline.SL.generateScaleGraphs(SL.scala:165)
    at com.a.data_pipeline.GA$$anonfun$generateGraphsDataScale$1.apply(GA.scala:23)
    at com.a.data_pipeline.GA$$anonfun$generateGraphsDataScale$1.apply(GA.scala:21)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at com.a.data_pipeline.GA$.generateGraphsDataScale(GA.scala:21)
    at com.a.data_pipeline.GA$.main(GA.scala:52)
    at com.a.data_pipeline.GA.main(GA.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1$$anonfun$apply$1.apply(SL.scala:176)
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1$$anonfun$apply$1.apply(SL.scala:167)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1.apply(SL.scala:166)
    at com.a.data_pipeline.SL$$anonfun$generateScaleGraphs$1.apply(SL.scala:166)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

2) But if I use the following code, running in local mode works fine, but running in cluster mode I get NullPointerException or Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration

import sparkSession.implicits._
val testDS = sparkSession.createDataFrame(
  Seq(
    ABC("1","2", 1),
    ABC("3","9", 3),
    ABC("8","2", 2),
    ABC("1","2", 3),
    ABC("3","9", 1),
    ABC("2","7", 1),
    ABC("1","3", 2))
).as[ABC]

val test = testDS
  .rdd
  .groupBy(_.c)
  .foreachPartition(
    p => p.foreach(
      a => {
        val id = a._1
        println("inside foreach, id: " + id)
        val itABC = a._2
        val ss = SparkSessionUtil.getInstance(clusterMode)
        import ss.implicits._
        val itDS = ss.createDataFrame(
        ss.sparkContext.parallelize(itABC.toList, numSlices=200)).as[ABC]
        itDS.show()
        funcA(itDS, id)  // in funcA, I'd like to use this itDS(Dataset) to do some calculation, like itDS.groupby().agg().filter()
      }
    )
  )

Here's the system out log for 2):

17/10/26 14:19:12 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
inside foreach, id: 1
17/10/26 14:19:13 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  1|
|  3|  9|  1|
|  2|  7|  1|
+---+---+---+

inside foreach, id: 2
17/10/26 14:19:14 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
17/10/26 14:19:14 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  8|  2|  2|
|  1|  3|  2|
+---+---+---+

inside foreach, id: 3
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  3|  9|  3|
|  1|  2|  3|
+---+---+---+

I would like to use id related Dataset(itDS) in funcA(itDS, id) to calculate something like itDS.groupby().agg().filter(),How should I solve this problem? Thank you in advance?

Upvotes: 3

Views: 3286

Answers (1)

Ram Ghadiyaram
Ram Ghadiyaram

Reputation: 29165

Recently encountered the same issue and since there was no answer Trying to add answer this question.... faustineinsun Comment is answer :

Thank you, @AlexandreDupriez ! The problem has been solved by restructuring the codes from sparkSession.sql() to Seq[ABC] so that sparkSession isn't referenced in the map/foreach function closure, since sparkSession isn't serializable, it's designed to run on the driver not on workers

Conclusion : With in foreach , foreachPartition or map, mapPartitions you CANT create a new dataframe with spark session .read or .sql inside it it will throw null pointer exception.

Also have a look at :

How to use SQLContext and SparkContext inside foreachPartition

Upvotes: 4

Related Questions