zzzzzzzz
zzzzzzzz

Reputation: 33

how to return an Iterator when using mapPartitionsToPair / PairFlatMapFunction

when using mapPartitionsToPair / PairFlatMapFunction in spark, i found an example on Internet like

spark.read ().textFile (hdfsPath).javaRDD ()
.mapPartitionsToPair (new PairFlatMapFunction <Iterator <String>, String, String> () {
  public Iterable <Tuple2 <String, String> > call (Iterator <String> input) {
    List <String> result = new ArrayList <String> ();
    while (input.hasNext ()) result.add (doSomeThing (input.next ()));
    return result;
  }
});

but get error when complie

return type Iterable<Tuple2<String,String>> is not compatible with Iterator<Tuple2<String,String>>

I found the declaring of call

java.util.Iterator<scala.Tuple2<K,V>> call(T t) 

so the call should be return an Iterator.

hence could someone help me how to return an Iterator on spark in javaRDD api? thanks a lot

PS: i tried the code like below doesnt work on the clusters:

public Iterator <Tuple2 <String, Strng> > call (Iterator <String> input) {
  List <String> result = new ArrayList <String> ();
  while (input.hasNext ()) result.add (doSomeThing (input.next ()));
  return result.iterator;
}

Upvotes: 0

Views: 2367

Answers (1)

Darshan
Darshan

Reputation: 2333

Looks like spark version mismatch between your development environment and the cluster.

From Spark-2.0.0, Java RDD’s flatMap and mapPartitions functions return Java iterator and not iterable.

So, if your cluster has lesser than Spark-2.0.0 then, use the same spark version when developing also.

For Spark-2.0.0 or greater,

public Iterator <Tuple2 <String, Strng> > call (Iterator <String> input) {
List <String> result = new ArrayList <String> ();
while (input.hasNext ()) result.add (doSomeThing (input.next ()));
return result.iterator;
}

should work.

For spark version less than 2.0.0,

public Iterable <Tuple2 <String, String> > call (Iterator <String> input) {
List <String> result = new ArrayList <String> ();
while (input.hasNext ()) result.add (doSomeThing (input.next ()));
return result;
}

should work.

Upvotes: 4

Related Questions