Reputation: 21
I am using mapPartitionstoPair function for JavaPairRDD as per follows:
JavaPairRDD<MyKeyClass, MyValueClass> myRDD;
JavaPairRDD<Integer, Double> myResult = myRDD.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<MyKeyClass,MyValueClass>>, Integer, Double>(){
public Iterable<Tuple2<MyInteger, MyDouble>> call(Iterator<Tuple2<MyKeyClass, MyValueClass>> arg0) throws Exception {
Tuple2<MyKeyClass, MyValueClass> temp = arg0.next(); //The error is coming here...
TreeMap<Integer, Double> dic = new TreeMap<Integer, Double>();
do{
........
// Some Code to compute to newIntegerValue and newDoubleValue from temp
........
dic.put(newIntegerValue, newDoubleValue)
temp = arg0.next();
}while(arg0.hasNext());
}
}
I am able to run it on Apache Spark pseudo-distributed mode. I am not able to run the above code on my cluster. I am getting the following error:
java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30)
at IncrementalGraph$6.call(MySparkJob.java:584)
at IncrementalGraph$6.call(MySparkJob.java:573)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$9$1.apply(JavaRDDLike.scala:186)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$9$1.apply(JavaRDDLike.scala:186)
at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
I am using Spark 1.2.0 over Hadoop 2.2.0.
Can anyone help me fix this issue??
Update: hasNext() gives true before calling of next() on the iterator
Upvotes: 1
Views: 2519
Reputation: 21
I found the answer.
I made myRDD storage level as MEMORY_ONLY. Before the starting of the mapPartitonsToPair transofrmations, I had the following line in my code:
myRDD.persist(StorageLevel.MEMORY_ONLY());
I removed that and it fixed the program.
I don't know why it fixed it. If someone can explain it, highly appreciated.
Upvotes: 1
Reputation: 7452
Your code makes the assumption that all of the iterators that will be passed in will have some elements, this isn't the case. Some partitions can be empty (especially with small test data sets). Its a pretty common pattern to just check if the iterator is empty and return an empty iterator if that is the case at the start of your mapPartitions
code. Hope that helps :)
Upvotes: 0