Reputation: 1051
I have enabled checkpointing in my spark streaming application and encounter this error on a class that is downloaded as a dependency.
With no checkpointing the application works great.
Error:
com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer
Serialization stack:
- object not serializable (class: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer, value: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer@46c7c593)
- field (class: com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector, name: _paranamer, type: interface com.fasterxml.jackson.module.paranamer.shaded.Paranamer)
- object (class com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector, com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector@39d62e47)
- field (class: com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, name: _secondary, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
- object (class com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair@7a925ac4)
- field (class: com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, name: _primary, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
- object (class com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair@203b98cf)
- field (class: com.fasterxml.jackson.databind.cfg.BaseSettings, name: _annotationIntrospector, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
- object (class com.fasterxml.jackson.databind.cfg.BaseSettings, com.fasterxml.jackson.databind.cfg.BaseSettings@78c34153)
- field (class: com.fasterxml.jackson.databind.cfg.MapperConfig, name: _base, type: class com.fasterxml.jackson.databind.cfg.BaseSettings)
- object (class com.fasterxml.jackson.databind.DeserializationConfig, com.fasterxml.jackson.databind.DeserializationConfig@2df0a4c3)
- field (class: com.fasterxml.jackson.databind.ObjectMapper, name: _deserializationConfig, type: class com.fasterxml.jackson.databind.DeserializationConfig)
- object (class com.fasterxml.jackson.databind.ObjectMapper, com.fasterxml.jackson.databind.ObjectMapper@2db07651)
I am not sure how to extend this class as serializable as its a maven dependency. I am using v2.6.0 of the jackson core in my pom.xml. If I try to use a newer version of Jackson core I am getting Incompatible Jackson version exception.
Code
liveRecordStream
.foreachRDD(newRDD => {
if (!newRDD.isEmpty()) {
val cacheRDD = newRDD.cache()
val updTempTables = tempTableView(t2s, stgDFMap, cacheRDD)
val rdd = updatestgDFMap(stgDFMap, cacheRDD)
persistStgTable(stgDFMap)
dfMap
.filter(entry => updTempTables.contains(entry._2))
.map(spark.sql)
.foreach( df => writeToES(writer, df))
cacheRDD.unpersist()
}
}
The issue is happening only if a method call happens inside foreachRDD
like tempTableView
in this case.
tempTableView
def tempTableView(t2s: Map[String, StructType], stgDFMap: Map[String, DataFrame], cacheRDD: RDD[cacheRDD]): Set[String] = {
stgDFMap.keys.filter { table =>
val tRDD = cacheRDD
.filter(r => r.Name == table)
.map(r => r.values)
val tDF = spark.createDataFrame(tRDD, tableNameToSchema(table))
if (!tRDD.isEmpty()) {
val tName = s"temp_$table"
tDF.createOrReplaceTempView(tName)
}
!tRDD.isEmpty()
}.toSet
}
Any help is appreciated. Not sure how to debug this and fix the issue.
Upvotes: 1
Views: 1082
Reputation: 1051
The issue was with jackson
objectMapper
that was trying to be serialized. objectMapper
should not be serialized. Fixed this by adding @transient val objMapper = new ObjectMapper...
Upvotes: 0
Reputation: 4132
From the code snippet you had shared, I don't see where the jackson
library is invoked. However, NotSerializableException
usually happens when you try to send an object which doesn't implement Serializable
interface over wire.
And Spark is distributed processing engine, meaning it works this way: There is a driver and multiple executors across nodes. Only the part of code that is needed to be computed is sent by the driver
to the executors
(over wire). Spark transformations happen in that way i.e. across multiple nodes and if you try pass an instance of a class, which doesn't implement serializable
interface, to such code blocks (the block that executes across nodes), it will throw NotSerializableException
.
Ex:
def main(args: Array[String]): Unit = {
val gson: Gson = new Gson()
val sparkConf = new SparkConf().setMaster("local[2]")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val rdd = spark.sparkContext.parallelize(Seq("0","1"))
val something = rdd.map(str => {
gson.toJson(str)
})
something.foreach(println)
spark.close()
}
This code block will throw NotSerializableException
because we are sending an instance of Gson
to a distributed function. map
is a Spark transformation operation so it will execute on executors. The following will work:
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val rdd = spark.sparkContext.parallelize(Seq("0","1"))
val something = rdd.map(str => {
val gson: Gson = new Gson()
gson.toJson(str)
})
something.foreach(println)
spark.close()
}
Reason why the above will work is, we are instantiating Gson
within a transformation, so it will be instantiated at the executor, meaning it won't be sent from the driver program over the wire so no serialization is needed.
Upvotes: 5