Gupta
Gupta

Reputation: 21

Apache Spark iterating through RDD gives error using mappartitionstopair

I am using mapPartitionstoPair function for JavaPairRDD as per follows:

JavaPairRDD<MyKeyClass, MyValueClass> myRDD;

JavaPairRDD<Integer, Double> myResult = myRDD.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<MyKeyClass,MyValueClass>>, Integer, Double>(){

  public Iterable<Tuple2<MyInteger, MyDouble>> call(Iterator<Tuple2<MyKeyClass, MyValueClass>> arg0) throws Exception {

  Tuple2<MyKeyClass, MyValueClass> temp = arg0.next(); //The error is coming here...
  TreeMap<Integer, Double> dic = new TreeMap<Integer, Double>();

  do{

   ........
   // Some Code to compute to newIntegerValue and newDoubleValue from temp
   ........

   dic.put(newIntegerValue, newDoubleValue)
   temp = arg0.next();

   }while(arg0.hasNext());

  }

}

I am able to run it on Apache Spark pseudo-distributed mode. I am not able to run the above code on my cluster. I am getting the following error:

java.util.NoSuchElementException: next on empty iterator
    at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
    at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
    at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
    at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30)
    at IncrementalGraph$6.call(MySparkJob.java:584)
    at IncrementalGraph$6.call(MySparkJob.java:573)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$9$1.apply(JavaRDDLike.scala:186)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$9$1.apply(JavaRDDLike.scala:186)
    at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
    at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
    at java.lang.Thread.run(Thread.java:722)

I am using Spark 1.2.0 over Hadoop 2.2.0.

Can anyone help me fix this issue??

Update: hasNext() gives true before calling of next() on the iterator

Upvotes: 1

Views: 2519

Answers (2)

Gupta
Gupta

Reputation: 21

I found the answer.

I made myRDD storage level as MEMORY_ONLY. Before the starting of the mapPartitonsToPair transofrmations, I had the following line in my code:

myRDD.persist(StorageLevel.MEMORY_ONLY());

I removed that and it fixed the program.

I don't know why it fixed it. If someone can explain it, highly appreciated.

Upvotes: 1

Holden
Holden

Reputation: 7452

Your code makes the assumption that all of the iterators that will be passed in will have some elements, this isn't the case. Some partitions can be empty (especially with small test data sets). Its a pretty common pattern to just check if the iterator is empty and return an empty iterator if that is the case at the start of your mapPartitions code. Hope that helps :)

Upvotes: 0

Related Questions