fledgling
fledgling

Reputation: 1051

java.io.NotSerializableException with Spark Streaming Checkpoint enabled

I have enabled checkpointing in my spark streaming application and encounter this error on a class that is downloaded as a dependency.

With no checkpointing the application works great.

Error:

com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer
Serialization stack:
    - object not serializable (class: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer, value: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer@46c7c593)
    - field (class: com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector, name: _paranamer, type: interface com.fasterxml.jackson.module.paranamer.shaded.Paranamer)
    - object (class com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector, com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector@39d62e47)
    - field (class: com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, name: _secondary, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
    - object (class com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair@7a925ac4)
    - field (class: com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, name: _primary, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
    - object (class com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair@203b98cf)
    - field (class: com.fasterxml.jackson.databind.cfg.BaseSettings, name: _annotationIntrospector, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
    - object (class com.fasterxml.jackson.databind.cfg.BaseSettings, com.fasterxml.jackson.databind.cfg.BaseSettings@78c34153)
    - field (class: com.fasterxml.jackson.databind.cfg.MapperConfig, name: _base, type: class com.fasterxml.jackson.databind.cfg.BaseSettings)
    - object (class com.fasterxml.jackson.databind.DeserializationConfig, com.fasterxml.jackson.databind.DeserializationConfig@2df0a4c3)
    - field (class: com.fasterxml.jackson.databind.ObjectMapper, name: _deserializationConfig, type: class com.fasterxml.jackson.databind.DeserializationConfig)
    - object (class com.fasterxml.jackson.databind.ObjectMapper, com.fasterxml.jackson.databind.ObjectMapper@2db07651)

I am not sure how to extend this class as serializable as its a maven dependency. I am using v2.6.0 of the jackson core in my pom.xml. If I try to use a newer version of Jackson core I am getting Incompatible Jackson version exception.

Code

liveRecordStream
      .foreachRDD(newRDD => {
        if (!newRDD.isEmpty()) {
          val cacheRDD = newRDD.cache()
          val updTempTables = tempTableView(t2s, stgDFMap, cacheRDD)
          val rdd = updatestgDFMap(stgDFMap, cacheRDD)
          persistStgTable(stgDFMap)
          dfMap
            .filter(entry => updTempTables.contains(entry._2))
            .map(spark.sql)
            .foreach( df => writeToES(writer, df))

          cacheRDD.unpersist()
        }
      }

The issue is happening only if a method call happens inside foreachRDD like tempTableView in this case.

tempTableView

def tempTableView(t2s: Map[String, StructType], stgDFMap: Map[String, DataFrame], cacheRDD: RDD[cacheRDD]): Set[String] = {
    stgDFMap.keys.filter { table =>
      val tRDD = cacheRDD
        .filter(r => r.Name == table)
        .map(r => r.values)
         val tDF = spark.createDataFrame(tRDD, tableNameToSchema(table))
      if (!tRDD.isEmpty()) {
        val tName = s"temp_$table"
        tDF.createOrReplaceTempView(tName)
      }
      !tRDD.isEmpty()
    }.toSet
  }

Any help is appreciated. Not sure how to debug this and fix the issue.

Upvotes: 1

Views: 1082

Answers (2)

fledgling
fledgling

Reputation: 1051

The issue was with jackson objectMapper that was trying to be serialized. objectMapper should not be serialized. Fixed this by adding @transient val objMapper = new ObjectMapper...

Upvotes: 0

Sivaprasanna Sethuraman
Sivaprasanna Sethuraman

Reputation: 4132

From the code snippet you had shared, I don't see where the jackson library is invoked. However, NotSerializableException usually happens when you try to send an object which doesn't implement Serializable interface over wire.

And Spark is distributed processing engine, meaning it works this way: There is a driver and multiple executors across nodes. Only the part of code that is needed to be computed is sent by the driver to the executors (over wire). Spark transformations happen in that way i.e. across multiple nodes and if you try pass an instance of a class, which doesn't implement serializable interface, to such code blocks (the block that executes across nodes), it will throw NotSerializableException.

Ex:

def main(args: Array[String]): Unit = {
   val gson: Gson = new Gson()

   val sparkConf = new SparkConf().setMaster("local[2]")
   val spark = SparkSession.builder().config(sparkConf).getOrCreate()
   val rdd = spark.sparkContext.parallelize(Seq("0","1"))

   val something = rdd.map(str => {
     gson.toJson(str)
   })

   something.foreach(println)
   spark.close()
}

This code block will throw NotSerializableException because we are sending an instance of Gson to a distributed function. map is a Spark transformation operation so it will execute on executors. The following will work:

def main(args: Array[String]): Unit = {

   val sparkConf = new SparkConf().setMaster("local[2]")
   val spark = SparkSession.builder().config(sparkConf).getOrCreate()
   val rdd = spark.sparkContext.parallelize(Seq("0","1"))

   val something = rdd.map(str => {
     val gson: Gson = new Gson()
     gson.toJson(str)
   })

   something.foreach(println)
   spark.close()
}

Reason why the above will work is, we are instantiating Gson within a transformation, so it will be instantiated at the executor, meaning it won't be sent from the driver program over the wire so no serialization is needed.

Upvotes: 5

Related Questions