Reputation: 1304
Two questions, the answer to the general one will guide me on how minimal I can make a MVCE.
1) How can I know to register WrappedArray up front, (and every other class in Scala I might use)? Is it normal to have to register classes from libraries with Kryo?
and the specific:
2) How do I fix this? (Willing to admit I might have something else screwy going on that if reflecting a false error here, so don't kill yourselves trying to reproduce this)
DETAILS
Testing out a Spark program in Java using our customer classes related to genetics and statistic, on Spark 1.4.1, Scala 2.11.5 with the following settings on SparkConf:
// for kyro serializer it wants to register all classes that need to be serialized
Class[] kryoClassArray = new Class[]{DropResult.class, DropEvaluation.class, PrintHetSharing.class};
SparkConf sparkConf = new SparkConf().setAppName("PipeLinkageData")
<SNIP other settings to declare master>
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//require registration of all classes with Kryo
.set("spark.kryo.registrationRequired", "true")
.registerKryoClasses(kryoClassArray);
Getting this error (repeated at end of long error listing):
Caused by: java.lang.IllegalArgumentException: Class is not
registered: scala.collection.mutable.WrappedArray$ofRef Note: To
register this class use:
kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
But I never call that class from my code. I can add scala.collection.mutable.WrappedArray
to the kryoClassArray but it doesn't fix the problem. If I add scala.collection.mutable.WrappedArray$ofRef.class (as suggested in the error) that's a syntax error, I see I can't declare an anonymous function here?
MVCE: I have started a MVCE but the problem is, to do one with our classes requires external libraries and text/data files. Once I strip out our classes, I don't have the problem. If someone could answer the general question it might help guide me on how much of an MVCE I can come up with.
As I am writing this question I got the go-ahead to update to 1.5.2, will see if there's any change there and update question if so.
Short of a MVCE here's my class declarations:
public class MVCEPipeLinkageInterface extends LinkageInterface implements Serializable {
class PrintHetSharing implements VoidFunction<DropResult> {
class SparkDoDrop implements Function<Integer, Integer> {
Full errors:
16/01/08 10:54:54 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
16/01/08 10:54:55 INFO SparkDeploySchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://[email protected]:55646/user/Executor#214759698]) with ID 0
16/01/08 10:54:55 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it.
java.io.IOException: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef
Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)
at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168)
at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:226)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:295)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:293)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:293)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.makeOffers(CoarseGrainedSchedulerBackend.scala:167)
at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receiveAndReply$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:143)
at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:178)
at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:127)
at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:198)
at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:126)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:93)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Class is not registered: scala.collection.mutable.WrappedArray$ofRef
Note: To register this class use: kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
Upvotes: 6
Views: 5527
Reputation: 809
In Scala you should fix this issue adding 'scala.collection.mutable.WrappedArray.ofRef[_]' as registered class as in the following snippet:
conf.registerKryoClasses(
Array(
...
classOf[Person],
classOf[Array[Person]],
...
classOf[scala.collection.mutable.WrappedArray.ofRef[_]]
)
)
Upvotes: 9
Reputation: 5999
You don't need to make everything serializable, independent of it is part of a client library or not. But you DO need to make any lambda that will take effect on the executors serializable. Those do not run on the master node, so there is no way to prevent serialization (nor do you want to, as the entire purpose of Spark is distributed computation).
For examples and such (and if you don't quite grasp the concept yet), check the official docs about this.
Upvotes: 2