Reputation: 5869
I am going to ask this question in the context of Spark, because that's what I'm facing, but this might be a plain Java issue.
In our spark job, we have a Resolver
which needs to be used in all of our workers (it's used in a udf). The problem is that it's not serializable and we cannot change it to be so. The solution was to put it as a member of another class which is serializable.
So we ended up with:
public class Analyzer implements Serializable {
transient Resolver resolver;
public Analyzer() {
System.out.println("Initializing a Resolver...");
resolver = new Resolver();
}
public int resolve(String key) {
return resolver.find(key);
}
}
We then broadcast
this class using the Spark API:
val analyzer = sparkContext.broadcast(new Analyzer())
(more information about Spark broadcast can be found here)
We then proceed to use analyzer
in a UDF, as part of our spark code, with something like:
val resolve = udf((key: String) => analyzer.value.resolve(key))
val result = myDataFrame.select("key", resolve("key")).count()
This all works as expected, but leaves we wondering.
Resolver
does not implement Serializable
and is, therefore, marked as transient
- meaning it does not get serialized along with it's owner object Analyzer
.
But as you can see clearly from the code above, the resolve()
method uses resolver
, so it must not be null. And indeed it isn't. The code works.
So if the field is not passed through serialization, how is the resolver
member instantiated?
My initial thought was that maybe the Analyzer
constructor is called on the receiving side (i.e. the spark worker), but then I would expect to see the line "Initializing a Resolver..."
printed several times. But it's only printed once, which is probably an indication to the fact that it's only called once, right before it's passed to the broadcast API. So why isn't resolver
null?
Am I missing something about JVM serialization or Spark serialization?
How does this code even work?
Spark runs on YARN, in cluster
mode.
spark.serializer
is set to org.apache.spark.serializer.KryoSerializer
.
Upvotes: 11
Views: 3109
Reputation: 149538
So if the field is not passed through serialization, how is the resolver member instantiated?
It is instantiated via the constructor call (new Resolver
), when invoking kryo.readObject
:
kryo.readClassAndObject(input).asInstanceOf[T]
My initial thought was that maybe the Analyzer constructor is called on the receiving side (i.e. the spark worker), but then I would expect to see the line "Initializing a Resolver..." printed several times. But it's only printed once, which is probably an indication to the fact that it's only called once
That's not how a broadcast variable works. What happens is that when each Executor needs the broadcast variable in scope, it first checks if it has the object in memory in its BlockManager
, if it doesn't, it asks either the driver or the neighbor executors (if there are multiple executors on the same Worker node) for their cached instance, and they serialize it and send it to him, and in turn he receives the instance and caches it inside his own BlockManager
.
This is documented in the behavior of the TorrentBroadcast
(which is the default broadcasting implementation):
* The driver divides the serialized object into small chunks and
* stores those chunks in the BlockManager of the driver.
*
* On each executor, the executor first attempts to fetch the object from its BlockManager. If
* it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
* other executors if available. Once it gets the chunks, it puts the chunks in its own
* BlockManager, ready for other executors to fetch from.
*
* This prevents the driver from being the bottleneck in sending out multiple copies of the
* broadcast data (one per executor).
if we remove the transient it fails, and the stack-trace leads to Kryo
That is because there is probably a field inside your Resolver
class which even Kryo is unable to serialize, regardless of the Serializable
attribute.
Upvotes: 3