Reputation: 73
i am writing my first spark program in java and couldn't figure out below error. i have gone through lot of questions on stack-overflow but they don't see be relevant to my issue. i am trying to use the latest version of spark 2.4.4. i am running my application locally
here is program i have
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class SparkTextFile {
public static void main(String args[]) {
SparkConf conf = new SparkConf().setAppName("textfilereading").setMaster("local[*]");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> textRDD = context.textFile("/Users/user/Downloads/AccountHistory.csv");
textRDD.foreach(System.out::println);
context.close();
}
}
here are dependencies in pom file
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.4</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>commons-compiler</artifactId>
<version>3.0.8</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.2.1</version>
</dependency>
here is error i am getting.
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:926)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:925)
at org.apache.spark.api.java.JavaRDDLike.foreach(JavaRDDLike.scala:351)
at org.apache.spark.api.java.JavaRDDLike.foreach$(JavaRDDLike.scala:350)
at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
Caused by: java.io.NotSerializableException: java.io.PrintStream
Serialization stack:
- object not serializable (class: java.io.PrintStream, value: java.io.PrintStream@4df39a88)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.SparkTextFile, functionalInterfaceMethod=org/apache/spark/api/java/function/VoidFunction.call:(Ljava/lang/Object;)V, implementation=invokeVirtual java/io/PrintStream.println:(Ljava/lang/String;)V, instantiatedMethodType=(Ljava/lang/String;)V, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class com.SparkTextFile$$Lambda$622/1779219567, com.SparkTextFile$$Lambda$622/1779219567@a137d7a)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=interface org.apache.spark.api.java.JavaRDDLike, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/api/java/JavaRDDLike.$anonfun$foreach$1$adapted:(Lorg/apache/spark/api/java/function/VoidFunction;Ljava/lang/Object;)Ljava/lang/Object;, instantiatedMethodType=(Ljava/lang/Object;)Ljava/lang/Object;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.api.java.JavaRDDLike$$Lambda$623/1871259950, org.apache.spark.api.java.JavaRDDLike$$Lambda$623/1871259950@4ab550d5)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 12 more
i am not sure why that error, as i am not using any objects to serialize except to read from file.
i changed below line
textRDD.foreach(System.out::println);
to
textRDD.collect().forEach(System.out::println);
added collect to see what would be output, now i see different error message.
Exception in thread "main" java.lang.IllegalAccessError: tried to access method com.google.common.base.Stopwatch.<init>()V from class org.apache.hadoop.mapred.FileInputFormat
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:312)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:253)
at scala.Option.getOrElse(Option.scala:138)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:253)
at scala.Option.getOrElse(Option.scala:138)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:361)
at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:360)
at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
neither do i understand what above error for. can someone please provide info on how to understand that error and how can it be fixed.
Upvotes: 1
Views: 849
Reputation: 71
(Disclaimer: Not a Java developer. So, will try to answer according to the experience with Scala.)
Your using a higher order function here - foreach. The higher order functions would "serialise" the argument(s) supplied to them and send them across the partitions of RDD (usually spread across the machines over the network.) I'm not sure if System.out.println is a "serialisable object" in Java. So, one of the ways could be to use Lambda notation in Java and change the above code as follows:
textRDD.foreach( (s) -> System.out.println(s) )
Hope it helps! :)
Upvotes: 1
Reputation: 23
Read below link, it has all the information you need to know about your issue :) https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-apache-spark/
Upvotes: 0