Aviral Kumar
Aviral Kumar

Reputation: 824

FlatMap function on a CoGrouped RDD

I am trying to use a flatmap function on the cogroupedRDD which has the signature:

JavaPairRDD<String, Tuple2<Iterable<Row>, Iterable<Row>>>

my flatmap function is as follows:

static FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>,Row> setupF = new FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>,Row>() {
    @Override
    public Iterable<Row> call(Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>> row) {
        }};

But i am getting compilation error . I am sure it must be a syntactical issue which I am not able to understand.

Full Code:

JavaPairRDD<String, Tuple2<Iterable<Row>, Iterable<Row>>> coGroupedRDD = rdd1.cogroup(rdd2);
JavaRDD<Row> jd = coGroupedRDD.flatmap(setupF);
 static FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>,Row> setupF = new FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>,Row>() {
        @Override
        public Iterable<Row> call(Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>> row) {
        //logic
}};

Error:

The method flatmap(FlatMapFunction<Tuple2<String,Tuple2<Iterable<Row>,Iterable<Row>>>,Row>) is undefined for the type JavaPairRDD<String,Tuple2<Iterable<Row>,Iterable<Row>>>

Upvotes: 0

Views: 736

Answers (1)

MaxNevermind
MaxNevermind

Reputation: 2933

A wild guess here, maybe the reason is that you write your code against Spark 1.6 API but you actually use Spark 2.0 dependency? API differs between these two releases.

Spark 1.6 API FlatMapFunction method signature:

Iterable<R> call(T t)

Spark 2.0 API FlatMapFunction method signature:

Iterator<R> call(T t) 

So try change you code to this:

new FlatMapFunction<Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>>, Row>() {
        @Override
        public Iterator<Row> call(Tuple2<String, Tuple2<Iterable<Row>, Iterable<Row>>> row) {
        //...
        }
};

or using Java 8 lambda version:

   coGroupedRDD
   .flatMap(t -> {
        List<Row> result = new ArrayList<>();
        //...use t._1, t._2._1, t._2._2 to construct the result list
        return result.iterator();
    });

Upvotes: 2

Related Questions