JimLohse
JimLohse

Reputation: 1304

Apache Spark Function2, not getting declaration right

(This is based on trying to map a Integer RDD to a TholdDropResult RDD, but we need to initialize a single SparkDoDrop to generate all the (10^8) TholdDropResults, hence the use of mapPartitionsWithIndex, the only flavor in Java of mapPartition that will provide the type of function we need, methinks.)

Question: I am getting an error using org.apache.spark.api.java.function.Function2

I am not able to figure out how to work the "boolean" into a new Function2

When I try this code, scroll right to see the new Function2 declaration that appears to be giving me trouble (added builder-style formatting from answer):

JavaRDD<TholdDropResult> dropResultsN = dataSetN.mapPartitionsWithIndex(
                                      new Function2<Integer, 
                                      Iterator<Integer>, 
                                      Iterator<TholdDropResult>>(){

        @Override
        public Iterator<TholdDropResult> call(Integer partitionID, Iterator<Integer> integerIterator) throws Exception {
            //
            SparkDoDrop standin = makeNewSparkDoDrop();
            standin.initializeLI();
            List<TholdDropResult> rddToReturn = new ArrayList<>();
            while (integerIterator.hasNext()){
                rddToReturn.add(standin.call(integerIterator.next()));
            }
            return rddToReturn.iterator();

        }});
    dropResultsN.persist(StorageLevel.MEMORY_ONLY());

Here's the full error when I run gradle build:

JavaRDD<TholdDropResult> dropResultsN = dataSetN.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<TholdDropResult>>(){
required: Function2<Integer,Iterator<Integer>,Iterator<R>>,boolean
  found: <anonymous Function2<Integer,Iterator<Integer>,Iterator<TholdDropResult>>>
  reason: cannot infer type-variable(s) R
    (actual and formal argument lists differ in length)
  where R,T,This are type-variables:
    R extends Object declared in method <R>mapPartitionsWithIndex(Function2<Integer,Iterator<T>,Iterator<R>>,boolean)
    T extends Object declared in class AbstractJavaRDDLike
    This extends JavaRDDLike<T,This> declared in class AbstractJavaRDDLike

When I try to place the Boolean arg in there like so: new Function2<Integer, Iterator<Integer>, Iterator<TholdDropResult>, Boolean>() I get an error:

error: wrong number of type arguments; required 3
            JavaRDD<TholdDropResult> dropResultsN = dataSetN.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<TholdDropResult>, Boolean>(){

Finally if I use boolean instead of Boolean I get another error:

error: unexpected type
            JavaRDD<TholdDropResult> dropResultsN = dataSetN.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<TholdDropResult>, boolean>(){
                                                                                                                                                         ^
  required: reference
  found:    boolean

error: wrong number of type arguments; required 3
            JavaRDD<TholdDropResult> dropResultsN = dataSetN.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<TholdDropResult>, boolean>(){

Upvotes: 1

Views: 818

Answers (2)

JimLohse
JimLohse

Reputation: 1304

This works, not sure why, but separating out the Function2 did the trick (of course I have not compiled and run yet :)

        Function2 makeLIThenDropResults = new Function2<Integer,
                                                        Iterator<Integer>,
                                                        Iterator<TholdDropResult>>() {
            @Override
            public Iterator<TholdDropResult> call(Integer partitionID, Iterator<Integer> integerIterator) throws Exception {
                SparkDoDrop standin = makeNewSparkDoDrop();

                standin.initializeLI();
                List<TholdDropResult> rddToReturn = new ArrayList<>();
                while (integerIterator.hasNext()){
                    rddToReturn.add(standin.call(integerIterator.next()));
                }
                return rddToReturn.iterator();
            }
        };

        // now make the RDD of subset of N
        // setup bogus arrays of size N for parallelize to lead to dropResultsN
        JavaRDD<TholdDropResult> dropResultsN = dataSetN.mapPartitionsWithIndex(makeLIThenDropResults, true);

(hat tip to this answer on Apache Spark mapPartitionsWithIndex)

Upvotes: 0

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149608

You need the close the Function2 with an additional > before the Boolean:

JavaRDD<TholdDropResult> dropResultsN =
   dataSetN.mapPartitionsWithIndex(new Function2<Integer, 
                                                 Iterator<Integer>,
                                                 Iterator<TholdDropResult>>, Boolean>

The signature of mapPartitionsWithIndex looks like this:

<R> JavaRDD<R> mapPartitionsWithIndex(Function2<java.lang.Integer,
                                                java.util.Iterator<T>,
                                                java.util.Iterator<R>> f,
                                                boolean preservesPartitioning)

The Function2 takes an Integer and a Iterator<T> and returns an Iterator<R>. The boolean expected is a parameter not defined inside the Function2.

Upvotes: 1

Related Questions