Max Song
Max Song

Reputation: 1687

Errors in Overloaded Spark RDD Function zipPartitions

I'm trying to use the zipPartitions function defined in Spark's RDD class (url to Spark Scala Docs here: http://spark.apache.org/docs/0.9.1/api/core/index.html#org.apache.spark.rdd.RDD).

The function is overloaded, and contains several implementations.

def    zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) ⇒ Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]
def    zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) ⇒ Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]
def    zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) ⇒ Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]
def    zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) ⇒ Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]
def    zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) ⇒ Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]
def    zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) ⇒ Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]

I defined a function, merge, with type signature:

merge(iter1: Iterator[(Int,Int)], iter2: Iterator[(Int,Int)]): Iterator[(Int,Int)]

and have two RDD's of type [Int].

However, when I do Rdd1.zipPartitions(Rdd2,merge), the spark shell throws an error and says:

error: missing arguments for method merge;
follow this method with `_' if you want to treat it as a partially applied function

This is strange, because elsewhere, I am able to pass a function as an argument into another method fine. However, if I add two _'s to merge, and try

Rdd1.zipPartitions(Rdd2,merge(_:Iterator[(Int,Int)], _: Iterator[(Int,Int)]), then I get a different error:

error: overloaded method value zipPartitions with alternatives:
  [B, C, D, V](rdd2: org.apache.spark.rdd.RDD[B], rdd3: org.apache.spark.rdd.RDD[C], rdd4: org.apache.spark.rdd.RDD[D])(f: (Iterator[(Int, Int)], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit evidence$34: scala.reflect.ClassTag[B], implicit evidence$35: scala.reflect.ClassTag[C], implicit evidence$36: scala.reflect.ClassTag[D], implicit evidence$37: scala.reflect.ClassTag[V])org.apache.spark.rdd.RDD[V] <and>
  [B, C, D, V](rdd2: org.apache.spark.rdd.RDD[B], rdd3: org.apache.spark.rdd.RDD[C], rdd4: org.apache.spark.rdd.RDD[D], preservesPartitioning: Boolean)(f: (Iterator[(Int, Int)], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit evidence$30: scala.reflect.ClassTag[B], implicit evidence$31: scala.reflect.ClassTag[C], implicit evidence$32: scala.reflect.ClassTag[D], implicit evidence$33: scala.reflect.ClassTag[V])org.apache.spark.rdd.RDD[V] <and>
  [B, C, V](rdd2: org.apache.spark.rdd.RDD[B], rdd3: org.apache.spark.rdd.RDD[C])(f: (Iterator[(Int, Int)], Iterator[B], Iterator[C]) => Iterator[V])(implicit evidence$27: scala.reflect.ClassTag[B], implicit evidence$28: scala.reflect.ClassTag[C], implicit evidence$29: scala.reflect.ClassTag[V])org.apache.spark.rdd.RDD[V] <and>
  [B, C, V](rdd2: org.apache.spark.rdd.RDD[B], rdd3: org.apache.spark.rdd.RDD[C], preservesPartitioning: Boolean)(f: (Iterator[(Int, Int)], Iterator[B], Iterator[C]) => Iterator[V])(implicit evidence$24: scala.reflect.ClassTag[B], implicit evidence$25: scala.reflect.ClassTag[C], implicit evidence$26: scala.reflect.ClassTag[V])org.apache.spark.rdd.RDD[V] <and>
  [B, V](rdd2: org.apache.spark.rdd.RDD[B])(f: (Iterator[(Int, Int)], Iterator[B]) => Iterator[V])(implicit evidence$22: scala.reflect.ClassTag[B], implicit evidence$23: scala.reflect.ClassTag[V])org.apache.spark.rdd.RDD[V] <and>
  [B, V](rdd2: org.apache.spark.rdd.RDD[B], preservesPartitioning: Boolean)(f: (Iterator[(Int, Int)], Iterator[B]) => Iterator[V])(implicit evidence$20: scala.reflect.ClassTag[B], implicit evidence$21: scala.reflect.ClassTag[V])org.apache.spark.rdd.RDD[V]
 cannot be applied to (org.apache.spark.rdd.RDD[(Int, Int)], (Iterator[(Int, Int)], Iterator[(Int, Int)]) => Iterator[(Int, Int)])

       val RDD_combined = RDD1.zipPartitions(RDD1:org.apache.spark.rdd.RDD[(Int, Int)],merge(_:Iterator[(Int,Int)],_:Iterator[(Int,Int)]))

I suspect the error lies in this bottom line:

The function definition that I'm trying to match with this call is:

[B, V](rdd2: org.apache.spark.rdd.RDD[B])(f: (Iterator[(Int, Int)], Iterator[B]) => Iterator[V])(implicit evidence$22: scala.reflect.ClassTag[B], implicit evidence$23: scala.reflect.ClassTag[V])org.apache.spark.rdd.RDD[V]

however, what scala sees is

val RDD_combined = RDD1.zipPartitions(RDD1:org.apache.spark.rdd.RDD[(Int, Int)],merge(_:Iterator[(Int,Int)],_:Iterator[(Int,Int)]))

where the [B] type parameter has already been converted to [(Int,Int)].

Any insights into how to get this to work would be very appreciated!

Upvotes: 0

Views: 452

Answers (1)

Sean Owen
Sean Owen

Reputation: 66876

If you look at the signature, you'll see that this is actually a function with multiple parameter lists, not one list with multiple parameters. The invocation you need is more like:

RDD1.zipPartitions(RDD1)(merge)

(Not sure about the type references you added in your original?)

There may still be some other adjustments you need to make this work, but that is the essence of fixing the error you currently see.

Upvotes: 2

Related Questions