Coinnigh
Coinnigh

Reputation: 619

Unable to find proto buffer class in spark

I am learnning spark recently and I met one issue about the potocol buffers, I ran the below code in spark and got the "java.lang.RuntimeException: Unable to find proto buffer class".

object BasicSaveProtoBuf {
  def main(args: Array[String]) {
    val master = args(0)
    val outputFile = args(1)
    val sc = new SparkContext(master, "BasicSaveProtoBuf", System.getenv("SPARK_HOME"))
    val conf = new Configuration()
    LzoProtobufBlockOutputFormat.setClassConf(classOf[Places.Venue], conf);
    val dnaLounge = Places.Venue.newBuilder()
    dnaLounge.setId(1);
    dnaLounge.setName("DNA Lounge")
    dnaLounge.setType(Places.Venue.VenueType.CLUB)
    val data = sc.parallelize(List(dnaLounge.build()))
    val outputData = data.map{ pb =>
      val protoWritable = ProtobufWritable.newInstance(classOf[Places.Venue]);
      protoWritable.set(pb)
      (null, protoWritable)
    }
    outputData.saveAsNewAPIHadoopFile(outputFile, classOf[Text], classOf[ProtobufWritable[Places.Venue]],
      classOf[LzoProtobufBlockOutputFormat[ProtobufWritable[Places.Venue]]], conf)
  }
}

And the places.proto is

message Venue {
    required int32 id = 1;
    required string name = 2;
    required VenueType type = 3;
    optional string address = 4;

enum VenueType {
    COFFEESHOP = 0;
    WORKPLACE = 1;
    CLUB = 2;
    OMNOMNOM = 3;
    OTHER = 4;
}
}

The exception log is:

16/11/30 09:34:27 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.IOException: java.lang.RuntimeException: Unable to find proto buffer class
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1141)
    at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Unable to find proto buffer class
    at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1138)
    ... 20 more
Caused by: java.lang.ClassNotFoundException: com.oreilly.learningsparkexamples.proto.Places$Venue
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:191)
    at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:768)
    ... 37 more

As I check the protocol buffers class Places.Venue is really in the jar built by the class.Has anybody met this issue before? Any help is appreciated! Nobody knows this issue?

Upvotes: 2

Views: 2223

Answers (2)

Coinnigh
Coinnigh

Reputation: 619

After a lot of time search, finally I solve it by adding

spark.serializer  org.apache.spark.serializer.KryoSerializer

in the spark-defaults.conf file

Upvotes: 1

Artur Sukhenko
Artur Sukhenko

Reputation: 652

Your exception

Caused by: java.lang.ClassNotFoundException: com.oreilly.learningsparkexamples.proto.Places$Venue

states that class

com.oreilly.learningsparkexamples.proto.Places$Venue

is not found in your classpath.

You can add a jar with this class to spark.executor.extraClassPath option to spark-defaults.conf so it would be distributed to each executor as you are accessing it in rdd.map function (which is executed in executor process):

val outputData = data.map{ pb =>
  val protoWritable = ProtobufWritable.newInstance(classOf[Places.Venue]);
  protoWritable.set(pb)
  (null, protoWritable)
}

All Spark properties and default values are here: Spark Configuration

Upvotes: 0

Related Questions