bernie2436
bernie2436

Reputation: 23921

What is the meaning of this error from Apache Spark?

I am learning Spark 1.2 by running it on my local machine with one master and one worker. I start spark by running .sbin/start-all.sh

The master and worker turn on and I can see them in the ui. If I run the sample word count program from github, it works if I configure the spark context like this:

String[] jars = {"pathto/nlp.jar"};
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("spark://myurl:7077").setJars(jars);

In my java, I break a big document into sentences like this:

JavaRDD<Iterator<List<HasWord>>> sentences = lines.flatMap(new FlatMapFunction<String, Iterator<List<HasWord>>>() {
      /**
     * 
     */
    private static final long serialVersionUID = 1L;

    @Override
      public Iterable<Iterator<List<HasWord>>> call(String s) {
          return (Iterable<Iterator<List<HasWord>>>) new DocumentPreprocessor(s).iterator();
      }
});

So far so good.

Then I print out a count of the RDDs

System.out.println(sentences.count()); // This works fine. Prints an integer

Now I want to try to filter out some of the sentences (for now, I just will filter all of them by always just returning true).

sentences = sentences.filter(new Function<Iterator<List<HasWord>>, Boolean>() {
  /**
     * 
     */
    private static final long serialVersionUID = 2L;

@Override
  public Boolean call(Iterator<List<HasWord>> s) {
    return true;
  }
});

The function runs fine. But if I then go and run

System.out.println(sentences.count());

I get a long stack trace:

15/01/30 16:47:18 INFO DAGScheduler: Job 0 failed: count at JavaWordCount.java:134, took 1.203987 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 17, lens.att.net): java.io.InvalidClassException: nlp.nlp.JavaWordCount$1; local class incompatible: stream classdesc serialVersionUID = 1, local class serialVersionUID = 8625903781884920246
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:621)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I also get a (different) stack trace if I do not declare the serial id.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage 1.0 (TID 68, lens.att.net): java.io.InvalidClassException: nlp.nlp.JavaWordCount$2; local class incompatible: stream classdesc serialVersionUID = 3752701569517815536, local class serialVersionUID = 6132153642693122455
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:621)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

It seems like some class is not declaring a serial ID properly. But I get an error regardless of whether I include the serial ids (as shown above)

Notes

I am running this in eclipse. I have a maven project in eclipse with these configs:

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.10</artifactId>
   <version>1.2.0</version>
</dependency>

I am also running spark on my local machine. I downloaded to a directory pathto/spark-1.2.0-bin-hadoop2.4

Upvotes: 0

Views: 4488

Answers (2)

bernie2436
bernie2436

Reputation: 23921

In my case, this seems to be an issue that arises when your the spark program is not in sync with its jar dependencies.

My program loads jars like this

String[] jars = {"pathto/mydependencies.jar"};
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("spark://mylaptop:7077").setJars(jars);

If I make a change to the main program and then run it in debug mode in eclipse, I get this error. But if I re-export to pathto/mydependencies.jar it fixes it.

Upvotes: 1

Stephen C
Stephen C

Reputation: 719446

What needs the serial id? What is going wrong here?

The class that the exception complains about is nlp.nlp.JavaWordCount$1. That is the "name" of an anonymous inner class.

Looking at your code, I'd say it is your anonymous FlatMapFunction class. (The clue is that you are seeing an ID of "1" in the error message.)


Are you using the same JAR files on the serialization and deserialization sides? If not, my guess is that one of the sides is missing the:

 private static final long serialVersionUID = 1L;

The fix should be to use the same JARs.

But if the JARs are already identical ... this is weird.

As a possible workaround, try converting the anonymous inner class to a (named) nested class ... or even an outer class. If that works, you can use that data point to help you track down the real problem.

If you are using different versions of Spark in the same cluster, that could be the cause. Using the same version everywhere is advisable.

Upvotes: 2

Related Questions