Reputation: 33
when using mapPartitionsToPair / PairFlatMapFunction in spark, i found an example on Internet like
spark.read ().textFile (hdfsPath).javaRDD ()
.mapPartitionsToPair (new PairFlatMapFunction <Iterator <String>, String, String> () {
public Iterable <Tuple2 <String, String> > call (Iterator <String> input) {
List <String> result = new ArrayList <String> ();
while (input.hasNext ()) result.add (doSomeThing (input.next ()));
return result;
}
});
but get error when complie
return type Iterable<Tuple2<String,String>> is not compatible with Iterator<Tuple2<String,String>>
I found the declaring of call
java.util.Iterator<scala.Tuple2<K,V>> call(T t)
so the call should be return an Iterator.
hence could someone help me how to return an Iterator on spark in javaRDD api? thanks a lot
PS: i tried the code like below doesnt work on the clusters:
public Iterator <Tuple2 <String, Strng> > call (Iterator <String> input) {
List <String> result = new ArrayList <String> ();
while (input.hasNext ()) result.add (doSomeThing (input.next ()));
return result.iterator;
}
Upvotes: 0
Views: 2367
Reputation: 2333
Looks like spark version mismatch between your development environment and the cluster.
From Spark-2.0.0, Java RDD’s flatMap and mapPartitions functions return Java iterator and not iterable.
So, if your cluster has lesser than Spark-2.0.0 then, use the same spark version when developing also.
For Spark-2.0.0 or greater,
public Iterator <Tuple2 <String, Strng> > call (Iterator <String> input) {
List <String> result = new ArrayList <String> ();
while (input.hasNext ()) result.add (doSomeThing (input.next ()));
return result.iterator;
}
should work.
For spark version less than 2.0.0,
public Iterable <Tuple2 <String, String> > call (Iterator <String> input) {
List <String> result = new ArrayList <String> ();
while (input.hasNext ()) result.add (doSomeThing (input.next ()));
return result;
}
should work.
Upvotes: 4