Reputation: 2697
I am working a on Flink project and would like to parse the source JSON string data to Json Object. I am using jackson-module-scala for the JSON parsing. However, I encountered some issues with using the JSON parser within Flink APIs (map
for example).
Here are some examples of the code, and I cannot understand the reason under the hood why it is behaving like this.
In this case, I am doing what the jackson-module-scala's official exmaple code told me to do:
ObjectMapper
DefaultScalaModule
DefaultScalaModule
is a Scala object that includes support for all currently supported Scala data types.
readValue
in order to parse the JSON to Map
The error I got is: org.apache.flink.api.common.InvalidProgramException:
Task not serializable
.
object JsonProcessing {
def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// get input data
val text = env.readTextFile("xxx")
val mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule)
val counts = text.map(mapper.readValue(_, classOf[Map[String, String]]))
// execute and print result
counts.print()
env.execute("JsonProcessing")
}
}
Then I did some Google, and came up with the following solution, where registerModule
is moved into the map
function.
val mapper = new ObjectMapper
val counts = text.map(l => {
mapper.registerModule(DefaultScalaModule)
mapper.readValue(l, classOf[Map[String, String]])
})
However, what I am not able to understand is: why this is going to work, with calling method of outside-defined object mapper
? Is it because the ObjectMapper
itself is Serializable as stated here ObjectMapper.java#L114?
Now, the JSON parsing is working fine, but every time, I have to call mapper.registerModule(DefaultScalaModule)
which I think could cause some performance issue (Does it?). I also tried another solution as follows.
I created a new case class Jsen
, and use it as the corresponding parsing class, registering the Scala modules. And it is also working fine.
However, this is not so flexible if your input JSON is varying very often. It is not maintainable to manage the class Jsen
.
case class Jsen(
@JsonProperty("a") a: String,
@JsonProperty("c") c: String,
@JsonProperty("e") e: String
)
object JsonProcessing {
def main(args: Array[String]) {
...
val mapper = new ObjectMapper
val counts = text.map(mapper.readValue(_, classOf[Jsen]))
...
}
Additionally, I also tried using JsonNode
without calling registerModule
as follows:
...
val mapper = new ObjectMapper
val counts = text.map(mapper.readValue(_, classOf[JsonNode]))
...
It is working fine as well.
My main question is: what is actually causing the problem of Task not serializable under the hood of registerModule(DefaultScalaModule)
?
How to identify whether your code could potentially cause this unserializable problem during coding?
Upvotes: 2
Views: 2239
Reputation: 23788
The thing is that Apache Flink is designed to be distributed. It means that it needs to be able to run your code remotely. So it means that all your processing functions should be serializable. In the current implementation this is ensure early on when you build your streaming process even if you will not run this in any distributed mode. This is a trade-off with an obvious benefit of providing you feedback down to the very line that breaks this contract (via exception stack trace).
So when you write
val counts = text.map(mapper.readValue(_, classOf[Map[String, String]]))
what you actually write is something like
val counts = text.map(new Function1[String, Map[String, String]] {
val capturedMapper = mapper
override def apply(param: String) = capturedMapper.readValue(param, classOf[Map[String, String]])
})
The important thing here is that you capture the mapper
from the outside context and store it as a part of your Function1
object that has to be serializble. And this means that the mapper
has to be serializable. The designers of Jackson library recognized that kind of a need and since there is nothing fundamentally non-serizliable in a mapper they made their ObjectMapper
and the default Module
s serializable. Unfortunately for you the designers of Scala Jackson Module missed that and made their DefaultScalaModule
deeply non-serialiazable by making ScalaTypeModifier
and all sub-classes non-serializable. This is why your second code works while the first one doesn't: "raw" ObjectMapper
is serializable while ObjectMapper
with pre-registered DefaultScalaModule
is not.
There are a few possible workarounds. Probably the easiest one is to wrap ObjectMapper
object MapperWrapper extends java.io.Serializable {
// this lazy is the important trick here
// @transient adds some safety in current Scala (see also Update section)
@transient lazy val mapper = {
val mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper
}
def readValue[T](content: String, valueType: Class[T]): T = mapper.readValue(content, valueType)
}
and then use it as
val counts = text.map(MapperWrapper.readValue(_, classOf[Map[String, String]]))
This lazy
trick works because although an instance of DefaultScalaModule
is not serializable, the function to create an instance of DefaultScalaModule
is.
Update: what about @transient?
what are the differences here, if I add
lazy val
vs.@transient lazy val
?
This is actually a tricky question. What the lazy val
is compiled to is actually something like this:
object MapperWrapper extends java.io.Serializable {
// @transient is set or not set for both fields depending on its presence at "lazy val"
[@transient] private var mapperValue: ObjectMapper = null
[@transient] @volatile private var mapperInitialized = false
def mapper: ObjectMapper = {
if (!mapperInitialized) {
this.synchronized {
val mapper = new ObjectMapper
mapper.registerModule(DefaultScalaModule)
mapperValue = mapper
mapperInitialized = true
}
}
mapperValue
}
def readValue[T](content: String, valueType: Class[T]): T = mapper.readValue(content, valueType)
}
where @transient
on the lazy val
affects both backing fields. So now you can see why lazy val
trick works:
locally it works because it delays initialization of the mapperValue
field until first access to the mapper
method so the field is safe null
when the serialization check is performed
remotely it works because MapperWrapper
is fully serializable and the logic of how lazy val
should be initialized is put into a method of the same class (see def mapper
).
Note however that AFAIK this behavior of how lazy val
is compiled is an implementation detail of the current Scala compiler rather than a part of the Scala specification. If at some later point a class similar to .Net Lazy
will be added to the Java standard library, Scala compiler potentially might start generating different code. This is important because it provides a kind of trade-off for @transient
. The benefit of adding @transient
now is that it ensures that code like this works as well:
val someJson:String = "..."
val something:Something = MapperWrapper.readValue(someJson:String, ...)
val counts = text.map(MapperWrapper.readValue(_, classOf[Map[String, String]]))
Without @transient
the code above will fail because we forced initialization of the lazy
backing field and now it contains a non-serializable value. With @transient
this is not an issue as that field will not be serialized at all.
A potential drawback of @transient
is that if Scala changes the way code for lazy val
is generated and the field is marked as @transient
, it might actually be not de-serialized in the remote-work scenario.
Also there is a trick with object
because for object
s Scala compiler generates custom de-serialization logic (overrides readResolve
) to return the same singleton object. It means that the object including the lazy val
is not really de-serialized and the value from the object
itself is used. It means that @transient lazy val
inside object
is much more future-proof than inside class
in remote scenario.
Upvotes: 5