navige
navige

Reputation: 2517

NotSerializableException: org.apache.hadoop.io.LongWritable

I know this question has been answered many times, but I tried everything and I do not come to a solution. I have the following code which raises a NotSerializableException

val ids : Seq[Long] = ...
ids.foreach{ id =>
 sc.sequenceFile("file", classOf[LongWritable], classOf[MyWritable]).lookup(new LongWritable(id))
}

With the following exception

Caused by: java.io.NotSerializableException: org.apache.hadoop.io.LongWritable
Serialization stack:
...
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)

When creating the SparkContext, I do

val sparkConfig = new SparkConf().setAppName("...").setMaster("...")
sparkConfig.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConfig.registerKryoClasses(Array(classOf[BitString[_]], classOf[MinimalBitString], classOf[org.apache.hadoop.io.LongWritable]))
sparkConfig.set("spark.kryoserializer.classesToRegister", "org.apache.hadoop.io.LongWritable,org.apache.hadoop.io.Text,org.apache.hadoop.io.LongWritable")

and looking at the environment tab, I can see these entries. However, I do not understand why

  1. the Kryo serializer does not seem to be used (the stack does not mention Kryo)
  2. LongWritable is not serialized.

I'm using Apache Spark v. 1.5.1

Upvotes: 1

Views: 2026

Answers (3)

suraj vijayakumar
suraj vijayakumar

Reputation: 57

Try this solution. It worked fine for me.

SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkMapReduceApp");
        
        conf.registerKryoClasses(new Class<?>[]{
            LongWritable.class,
            Text.class
        });

Upvotes: 0

Kshitij Kulshrestha
Kshitij Kulshrestha

Reputation: 2072

I'm new to apache spark but tried to solve your problem, please evaluate it, if it can help you out with the problem of serialization, it's occurring because for spark - hadoop LongWritable and other writables are not serialized.

val temp_rdd = sc.parallelize(ids.map(id =>
sc.sequenceFile("file", classOf[LongWritable], classOf[LongWritable]).toArray.toSeq
)).flatMap(identity)

ids.foreach(id =>temp_rdd.lookup(new LongWritable(id)))

Upvotes: 0

zero323
zero323

Reputation: 330393

  1. Loading repeatedly the same data inside a loop is extremely inefficient. If you perform actions against the same data load it once and cache:

    val rdd = sc
      .sequenceFile("file", classOf[LongWritable], classOf[MyWritable])
    
    rdd.cache()
    
  2. Spark doesn't consider Hadoop Writables to be serializable. There is an open JIRA (SPARK-2421) for this. To handle LongWritables simple get should be enough:

    rdd.map{case (k, v) => k.get()}
    

    Regarding your custom class it is your responsibility to deal with this problem.

  3. Effective lookup requires a partitoned RDD. Otherwise it has to search every partition in your RDD.

    import org.apache.spark.HashPartitioner
    
    val numPartitions: Int = ???
    val partitioned = rdd.partitionBy(new HashPartitioner(numPartitions))
    
  4. Generally speaking RDDs are not designed for random access. Even with defined partitioner lookup has to linearly search candidate partition. With 5000 uniformly distributed keys and 10M objects in an RDD it most likely means a repeated search over a whole RDD. You have few options to avoid that:

    • filter

      val idsSet = sc.broadcast(ids.toSet)
      rdd.filter{case (k, v) => idsSet.value.contains(k)}
      
    • join

      val idsRdd = sc.parallelize(ids).map((_, null)) 
      idsRdd.join(rdd).map{case (k, (_, v)) => (k, v)}
      
    • IndexedRDD - it doesn't like a particularly active project though

  5. With 10M entries you'll probably be better with searching locally in memory than using Spark. For a larger data you should consider using a proper key-value store.

Upvotes: 4

Related Questions