Danilo Rodrigues
Danilo Rodrigues

Reputation: 413

Why can I convert a DStream[String] to DStream[List[String]] but not to DStream[DataFrame]?

My question is about DStream handling in legacy spark straming.

I would like to know why when I convert a DStream[String] to a DStream[List[String]] everything is ok, but when I try to convert this generated list to a dataframe using the toDF() method I get nullpointer error?

 val data: DStream[Kafka] = stream.map(rdd => Kafka(rdd.partition, rdd.offset, rdd.topic, rdd.value))

val groupedRDD: DStream[List[Kafka]] = data.mapPartitions(group => {
  val groupedData = group.toList
  List(groupedData).iterator
})

The code works normally so far, and the value generated in the groupedRDD variable can be manipulated normally as a list.

However, when I try to execute the next block of code I get nullpointer error.

val groupedRDD: DStream[DataFrame] = processMessages.mapPartitions(x => {
  val dataFrame = x.map(a => a.toDF)
  dataFrame
})

Can anyone explain to me why this happens? What if there is a way around this?

NOTE: I already know that I can convert the data to a DataFrame using foreachRDD, but I would like to know if there is any way to do this storing inside a DStream type Dstream[DataFrame].

@edited

This is the error I get whenever I try to run the code:

21/06/29 20:32:02 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
    at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:231)
    at app.AppStreaming$.$anonfun$appStreaming$6(AppStreaming.scala:82)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:457)
    at scala.collection.Iterator.foreach(Iterator.scala:944)
    at scala.collection.Iterator.foreach$(Iterator.scala:944)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
21/06/29 20:32:02 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.NullPointerException
    at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:231)
    at app.AppStreaming$.$anonfun$appStreaming$6(AppStreaming.scala:82)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:457)
    at scala.collection.Iterator.foreach(Iterator.scala:944)
    at scala.collection.Iterator.foreach$(Iterator.scala:944)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
21/06/29 20:32:02 ERROR Executor: Exception in task 2.0 in stage 0.0 (TID 2)
java.lang.NullPointerException
    at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:231)
    at app.AppStreaming$.$anonfun$appStreaming$6(AppStreaming.scala:82)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:457)
    at scala.collection.Iterator.foreach(Iterator.scala:944)
    at scala.collection.Iterator.foreach$(Iterator.scala:944)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
21/06/29 20:32:02 ERROR TaskSetManager: Task 1 in stage 0.0 failed 1 times; aborting job
21/06/29 20:32:02 ERROR JobScheduler: Error running job streaming job 1625009520000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1) (CPX-G1J9NCNEEJ0.dir.svc.accenture.com executor driver): java.lang.NullPointerException
    at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:231)
    at app.AppStreaming$.$anonfun$appStreaming$6(AppStreaming.scala:82)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:457)
    at scala.collection.Iterator.foreach(Iterator.scala:944)
    at scala.collection.Iterator.foreach$(Iterator.scala:944)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:1012)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:1010)
    at app.AppStreaming$.$anonfun$appStreaming$7(AppStreaming.scala:87)
    at app.AppStreaming$.$anonfun$appStreaming$7$adapted(AppStreaming.scala:86)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:629)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:629)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at scala.util.Try$.apply(Try.scala:209)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:231)
    at app.AppStreaming$.$anonfun$appStreaming$6(AppStreaming.scala:82)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:457)
    at scala.collection.Iterator.foreach(Iterator.scala:944)
    at scala.collection.Iterator.foreach$(Iterator.scala:944)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    ... 3 more
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1) (CPX-G1J9NCNEEJ0.dir.svc.accenture.com executor driver): java.lang.NullPointerException
    at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:231)
    at app.AppStreaming$.$anonfun$appStreaming$6(AppStreaming.scala:82)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:457)
    at scala.collection.Iterator.foreach(Iterator.scala:944)
    at scala.collection.Iterator.foreach$(Iterator.scala:944)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:1012)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:1010)
    at app.AppStreaming$.$anonfun$appStreaming$7(AppStreaming.scala:87)
    at app.AppStreaming$.$anonfun$appStreaming$7$adapted(AppStreaming.scala:86)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:629)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:629)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at scala.util.Try$.apply(Try.scala:209)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:231)
    at app.AppStreaming$.$anonfun$appStreaming$6(AppStreaming.scala:82)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:457)
    at scala.collection.Iterator.foreach(Iterator.scala:944)
    at scala.collection.Iterator.foreach$(Iterator.scala:944)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    ... 3 more

Process finished with exit code 1

Upvotes: 0

Views: 109

Answers (0)

Related Questions