fluency03
fluency03

Reputation: 2697

flink parsing JSON in map: InvalidProgramException: Task not serializable

I am working a on Flink project and would like to parse the source JSON string data to Json Object. I am using jackson-module-scala for the JSON parsing. However, I encountered some issues with using the JSON parser within Flink APIs (map for example).

Here are some examples of the code, and I cannot understand the reason under the hood why it is behaving like this.

Situation 1:

In this case, I am doing what the jackson-module-scala's official exmaple code told me to do:

  1. create a new ObjectMapper
  2. register the DefaultScalaModule

    DefaultScalaModule is a Scala object that includes support for all currently supported Scala data types.

  3. call the readValue in order to parse the JSON to Map

The error I got is: org.apache.flink.api.common.InvalidProgramException:Task not serializable.

object JsonProcessing {
  def main(args: Array[String]) {

    // set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // get input data
    val text = env.readTextFile("xxx")

    val mapper = new ObjectMapper
    mapper.registerModule(DefaultScalaModule)
    val counts = text.map(mapper.readValue(_, classOf[Map[String, String]]))

    // execute and print result
    counts.print()

    env.execute("JsonProcessing")
  }

}

Situation 2:

Then I did some Google, and came up with the following solution, where registerModule is moved into the map function.

val mapper = new ObjectMapper
val counts = text.map(l => {
  mapper.registerModule(DefaultScalaModule)
  mapper.readValue(l, classOf[Map[String, String]])
})

However, what I am not able to understand is: why this is going to work, with calling method of outside-defined object mapper? Is it because the ObjectMapper itself is Serializable as stated here ObjectMapper.java#L114?

Now, the JSON parsing is working fine, but every time, I have to call mapper.registerModule(DefaultScalaModule) which I think could cause some performance issue (Does it?). I also tried another solution as follows.

Situation 3:

I created a new case class Jsen, and use it as the corresponding parsing class, registering the Scala modules. And it is also working fine.

However, this is not so flexible if your input JSON is varying very often. It is not maintainable to manage the class Jsen.

case class Jsen(
  @JsonProperty("a") a: String,
  @JsonProperty("c") c: String,
  @JsonProperty("e") e: String
)

object JsonProcessing {
  def main(args: Array[String]) {
    ...
    val mapper = new ObjectMapper
    val counts = text.map(mapper.readValue(_, classOf[Jsen]))
    ...

}

Additionally, I also tried using JsonNode without calling registerModule as follows:

    ...
    val mapper = new ObjectMapper
    val counts = text.map(mapper.readValue(_, classOf[JsonNode]))
    ...

It is working fine as well.

My main question is: what is actually causing the problem of Task not serializable under the hood of registerModule(DefaultScalaModule)?

How to identify whether your code could potentially cause this unserializable problem during coding?

Upvotes: 2

Views: 2239

Answers (1)

SergGr
SergGr

Reputation: 23788

The thing is that Apache Flink is designed to be distributed. It means that it needs to be able to run your code remotely. So it means that all your processing functions should be serializable. In the current implementation this is ensure early on when you build your streaming process even if you will not run this in any distributed mode. This is a trade-off with an obvious benefit of providing you feedback down to the very line that breaks this contract (via exception stack trace).

So when you write

val counts = text.map(mapper.readValue(_, classOf[Map[String, String]]))

what you actually write is something like

val counts = text.map(new Function1[String, Map[String, String]] {
    val capturedMapper = mapper

    override def apply(param: String) = capturedMapper.readValue(param, classOf[Map[String, String]])
})

The important thing here is that you capture the mapper from the outside context and store it as a part of your Function1 object that has to be serializble. And this means that the mapper has to be serializable. The designers of Jackson library recognized that kind of a need and since there is nothing fundamentally non-serizliable in a mapper they made their ObjectMapper and the default Modules serializable. Unfortunately for you the designers of Scala Jackson Module missed that and made their DefaultScalaModule deeply non-serialiazable by making ScalaTypeModifier and all sub-classes non-serializable. This is why your second code works while the first one doesn't: "raw" ObjectMapper is serializable while ObjectMapper with pre-registered DefaultScalaModule is not.

There are a few possible workarounds. Probably the easiest one is to wrap ObjectMapper

object MapperWrapper extends java.io.Serializable {
  // this lazy is the important trick here
  // @transient adds some safety in current Scala (see also Update section)
  @transient lazy val mapper = {
    val mapper = new ObjectMapper
    mapper.registerModule(DefaultScalaModule)
    mapper
  }

  def readValue[T](content: String, valueType: Class[T]): T = mapper.readValue(content, valueType)
} 

and then use it as

val counts = text.map(MapperWrapper.readValue(_, classOf[Map[String, String]]))

This lazy trick works because although an instance of DefaultScalaModule is not serializable, the function to create an instance of DefaultScalaModule is.


Update: what about @transient?

what are the differences here, if I add lazy val vs. @transient lazy val?

This is actually a tricky question. What the lazy val is compiled to is actually something like this:

object MapperWrapper extends java.io.Serializable {

  // @transient is set or not set for both fields depending on its presence at "lazy val" 
  [@transient] private var mapperValue: ObjectMapper = null
  [@transient] @volatile private var mapperInitialized = false

  def mapper: ObjectMapper = {
    if (!mapperInitialized) {
      this.synchronized {
        val mapper = new ObjectMapper
        mapper.registerModule(DefaultScalaModule)
        mapperValue = mapper
        mapperInitialized = true
      }
    }
    mapperValue
  }


  def readValue[T](content: String, valueType: Class[T]): T = mapper.readValue(content, valueType)
}

where @transient on the lazy val affects both backing fields. So now you can see why lazy val trick works:

  1. locally it works because it delays initialization of the mapperValue field until first access to the mapper method so the field is safe null when the serialization check is performed

  2. remotely it works because MapperWrapper is fully serializable and the logic of how lazy val should be initialized is put into a method of the same class (see def mapper).

Note however that AFAIK this behavior of how lazy val is compiled is an implementation detail of the current Scala compiler rather than a part of the Scala specification. If at some later point a class similar to .Net Lazy will be added to the Java standard library, Scala compiler potentially might start generating different code. This is important because it provides a kind of trade-off for @transient. The benefit of adding @transient now is that it ensures that code like this works as well:

val someJson:String = "..."
val something:Something = MapperWrapper.readValue(someJson:String, ...)
val counts = text.map(MapperWrapper.readValue(_, classOf[Map[String, String]]))

Without @transient the code above will fail because we forced initialization of the lazy backing field and now it contains a non-serializable value. With @transient this is not an issue as that field will not be serialized at all.

A potential drawback of @transient is that if Scala changes the way code for lazy val is generated and the field is marked as @transient, it might actually be not de-serialized in the remote-work scenario.

Also there is a trick with object because for objects Scala compiler generates custom de-serialization logic (overrides readResolve) to return the same singleton object. It means that the object including the lazy val is not really de-serialized and the value from the object itself is used. It means that @transient lazy val inside object is much more future-proof than inside class in remote scenario.

Upvotes: 5

Related Questions