Reputation: 1304
(This is based on trying to map a Integer RDD to a TholdDropResult RDD, but we need to initialize a single SparkDoDrop to generate all the (10^8) TholdDropResults, hence the use of mapPartitionsWithIndex, the only flavor in Java of mapPartition that will provide the type of function we need, methinks.)
Question: I am getting an error using org.apache.spark.api.java.function.Function2
I am not able to figure out how to work the "boolean" into a new Function2
When I try this code, scroll right to see the new Function2
declaration that appears to be giving me trouble (added builder-style formatting from answer):
JavaRDD<TholdDropResult> dropResultsN = dataSetN.mapPartitionsWithIndex(
new Function2<Integer,
Iterator<Integer>,
Iterator<TholdDropResult>>(){
@Override
public Iterator<TholdDropResult> call(Integer partitionID, Iterator<Integer> integerIterator) throws Exception {
//
SparkDoDrop standin = makeNewSparkDoDrop();
standin.initializeLI();
List<TholdDropResult> rddToReturn = new ArrayList<>();
while (integerIterator.hasNext()){
rddToReturn.add(standin.call(integerIterator.next()));
}
return rddToReturn.iterator();
}});
dropResultsN.persist(StorageLevel.MEMORY_ONLY());
Here's the full error when I run gradle build
:
JavaRDD<TholdDropResult> dropResultsN = dataSetN.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<TholdDropResult>>(){
required: Function2<Integer,Iterator<Integer>,Iterator<R>>,boolean
found: <anonymous Function2<Integer,Iterator<Integer>,Iterator<TholdDropResult>>>
reason: cannot infer type-variable(s) R
(actual and formal argument lists differ in length)
where R,T,This are type-variables:
R extends Object declared in method <R>mapPartitionsWithIndex(Function2<Integer,Iterator<T>,Iterator<R>>,boolean)
T extends Object declared in class AbstractJavaRDDLike
This extends JavaRDDLike<T,This> declared in class AbstractJavaRDDLike
When I try to place the Boolean arg in there like so:
new Function2<Integer, Iterator<Integer>, Iterator<TholdDropResult>, Boolean>()
I get an error:
error: wrong number of type arguments; required 3
JavaRDD<TholdDropResult> dropResultsN = dataSetN.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<TholdDropResult>, Boolean>(){
Finally if I use boolean
instead of Boolean
I get another error:
error: unexpected type
JavaRDD<TholdDropResult> dropResultsN = dataSetN.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<TholdDropResult>, boolean>(){
^
required: reference
found: boolean
error: wrong number of type arguments; required 3
JavaRDD<TholdDropResult> dropResultsN = dataSetN.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<TholdDropResult>, boolean>(){
Upvotes: 1
Views: 818
Reputation: 1304
This works, not sure why, but separating out the Function2 did the trick (of course I have not compiled and run yet :)
Function2 makeLIThenDropResults = new Function2<Integer,
Iterator<Integer>,
Iterator<TholdDropResult>>() {
@Override
public Iterator<TholdDropResult> call(Integer partitionID, Iterator<Integer> integerIterator) throws Exception {
SparkDoDrop standin = makeNewSparkDoDrop();
standin.initializeLI();
List<TholdDropResult> rddToReturn = new ArrayList<>();
while (integerIterator.hasNext()){
rddToReturn.add(standin.call(integerIterator.next()));
}
return rddToReturn.iterator();
}
};
// now make the RDD of subset of N
// setup bogus arrays of size N for parallelize to lead to dropResultsN
JavaRDD<TholdDropResult> dropResultsN = dataSetN.mapPartitionsWithIndex(makeLIThenDropResults, true);
(hat tip to this answer on Apache Spark mapPartitionsWithIndex)
Upvotes: 0
Reputation: 149608
You need the close the Function2
with an additional >
before the Boolean
:
JavaRDD<TholdDropResult> dropResultsN =
dataSetN.mapPartitionsWithIndex(new Function2<Integer,
Iterator<Integer>,
Iterator<TholdDropResult>>, Boolean>
The signature of mapPartitionsWithIndex
looks like this:
<R> JavaRDD<R> mapPartitionsWithIndex(Function2<java.lang.Integer,
java.util.Iterator<T>,
java.util.Iterator<R>> f,
boolean preservesPartitioning)
The Function2
takes an Integer
and a Iterator<T>
and returns an Iterator<R>
. The boolean
expected is a parameter not defined inside the Function2
.
Upvotes: 1