Ged
Ged

Reputation: 18108

Case Class within foreachRDD causes Serialization Error

I can can create a DF inside foreachRDD if I do not try and use a Case Class and simply let default names for columns be made with toDF() or if I assign them via toDF("c1, "c2").

As soon as I try and use a Case Class, and having looked at the examples, I get:

Task not serializable

If I shift the Case Class statement around I then get:

toDF() not part of RDD[CaseClass]

It's legacy, but I am curious as to the nth Serialization error that Spark can produce and if it carries over into Structured Streaming.

I have an RDD that need not be split, may be that is the issue? NO. Running in DataBricks?

Coding is as follows:

import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable

case class Person(name: String, age: Int) //extends Serializable // Some say inherently serializable so not required

val spark = SparkSession.builder
    .master("local[4]")
    .config("spark.driver.cores", 2)
    .appName("forEachRDD")
    .getOrCreate()

val sc = spark.sparkContext
val ssc = new StreamingContext(spark.sparkContext, Seconds(1)) 

val rddQueue = new mutable.Queue[RDD[List[(String, Int)]]]()
val QS = ssc.queueStream(rddQueue) 

QS.foreachRDD(q => {
   if(!q.isEmpty) {   
      import spark.implicits._
      val q_flatMap = q.flatMap{x=>x}
      val q_withPerson = q_flatMap.map(field => Person(field._1, field._2))
      val df = q_withPerson.toDF() 
      df.show(false)
   }
 }
)

ssc.start()
for (c <- List(List(("Fred",53), ("John",22), ("Mary",76)), List(("Bob",54), ("Johnny",92), ("Margaret",15)), List(("Alfred",21), ("Patsy",34), ("Sylvester",7)) )) {
 rddQueue += ssc.sparkContext.parallelize(List(c))
} 
ssc.awaitTermination() 

Upvotes: 0

Views: 194

Answers (1)

Ged
Ged

Reputation: 18108

Having not grown up with Java, but having looked around I found out what to do, but am not expert enough to explain.

I was running in a DataBricks notebook where I prototype.

The clue is that the

case class Person(name: String, age: Int)

was inside the same DB Notebook. One needs to define the case class external to the current notebook - in a separate notebook - and thus separate to the class running the Streaming.

Upvotes: 0

Related Questions