Reputation: 619
I am learnning spark recently and I met one issue about the potocol buffers, I ran the below code in spark and got the "java.lang.RuntimeException: Unable to find proto buffer class".
object BasicSaveProtoBuf {
def main(args: Array[String]) {
val master = args(0)
val outputFile = args(1)
val sc = new SparkContext(master, "BasicSaveProtoBuf", System.getenv("SPARK_HOME"))
val conf = new Configuration()
LzoProtobufBlockOutputFormat.setClassConf(classOf[Places.Venue], conf);
val dnaLounge = Places.Venue.newBuilder()
dnaLounge.setId(1);
dnaLounge.setName("DNA Lounge")
dnaLounge.setType(Places.Venue.VenueType.CLUB)
val data = sc.parallelize(List(dnaLounge.build()))
val outputData = data.map{ pb =>
val protoWritable = ProtobufWritable.newInstance(classOf[Places.Venue]);
protoWritable.set(pb)
(null, protoWritable)
}
outputData.saveAsNewAPIHadoopFile(outputFile, classOf[Text], classOf[ProtobufWritable[Places.Venue]],
classOf[LzoProtobufBlockOutputFormat[ProtobufWritable[Places.Venue]]], conf)
}
}
And the places.proto is
message Venue {
required int32 id = 1;
required string name = 2;
required VenueType type = 3;
optional string address = 4;
enum VenueType {
COFFEESHOP = 0;
WORKPLACE = 1;
CLUB = 2;
OMNOMNOM = 3;
OTHER = 4;
}
}
The exception log is:
16/11/30 09:34:27 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.IOException: java.lang.RuntimeException: Unable to find proto buffer class
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1141)
at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Unable to find proto buffer class
at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1138)
... 20 more
Caused by: java.lang.ClassNotFoundException: com.oreilly.learningsparkexamples.proto.Places$Venue
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:768)
... 37 more
As I check the protocol buffers class Places.Venue is really in the jar built by the class.Has anybody met this issue before? Any help is appreciated! Nobody knows this issue?
Upvotes: 2
Views: 2223
Reputation: 619
After a lot of time search, finally I solve it by adding
spark.serializer org.apache.spark.serializer.KryoSerializer
in the spark-defaults.conf file
Upvotes: 1
Reputation: 652
Your exception
Caused by: java.lang.ClassNotFoundException: com.oreilly.learningsparkexamples.proto.Places$Venue
states that class
com.oreilly.learningsparkexamples.proto.Places$Venue
is not found in your classpath.
You can add a jar with this class to spark.executor.extraClassPath
option to spark-defaults.conf so it would be distributed to each executor as you are accessing it in rdd.map function (which is executed in executor process):
val outputData = data.map{ pb =>
val protoWritable = ProtobufWritable.newInstance(classOf[Places.Venue]);
protoWritable.set(pb)
(null, protoWritable)
}
All Spark properties and default values are here: Spark Configuration
Upvotes: 0