Reputation: 3591
Let's say I have a bunch of RDD's, maybe RDD[Int]
, and I have a function that defines an operation on a sequence of ints and returns an int, like a fold: f: Seq[Int] => Int
.
If I have a sequence of RDD's, Seq[RDD[Int]]
, how do I apply the function and return a single new RDD with the resulting value? I don't seem to find a zipPartitions
method in Spark which accomplishes this.
Upvotes: 1
Views: 4679
Reputation: 968
At some point the elements of the Seq[Int]
need to be bound to the parameters of f
. Whether this occurs beforehand by creating a collection ("materializing the lists") or by binding them one-by-one in a partial function application manner, at some point there needs to be a collection-like data structure that contains all of the elements. Certainly, once inside f
, they all need to be in the same place.
Here is a slightly more functional style implementation of Spiro's makeZip function:
def makeZip(xs: ListBuffer[RDD[Double]]): RDD[ListBuffer[Double]] = {
// initialize with arrays of length 1
val init = xs(0).map { ListBuffer(_) }
// fold in remaining by appending to mutable list
xs.drop(1).foldLeft(init) {
(rddS, rddXi) => rddS.zip(rddXi).map(sx => sx._1 += sx._2)
}
}
Upvotes: 2
Reputation: 3571
A simplified example using zip instead of zipPartitions. (I don't see where your problem description actually requires zipPartitions.) The tricky part is the way zip wants to return an RDD of pairs when what you need is an RDD of lists.
// set up an example
val rdd1 = sc.parallelize(Array(1,2,3,4), 2)
val rdd2 = sc.parallelize(Array(11,12,13,14), 2)
val rdd3 = sc.parallelize(Array(21,22,23,24), 2)
val rdd4 = sc.parallelize(Array(31,32,33,34), 2)
val allRDDs = Seq(rdd1, rdd2, rdd3, rdd4)
// zip the RDDs into an RDD of Seq[Int]
def makeZip(s: Seq[RDD[Int]]): RDD[Seq[Int]] = {
if (s.length == 1)
s.head.map(e => Seq(e))
else {
val others = makeZip(s.tail)
val all = s.head.zip(others)
all.map(elem => Seq(elem._1) ++ elem._2)
}
}
// zip and apply arbitrary function from Seq[Int] to Int
def applyFuncToZip(s: Seq[RDD[Int]], f:Seq[Int] => Int): RDD[Int] = {
val z = makeZip(s)
z.map(f)
}
val res = applyFuncToZip(allRDDs, (s: Seq[Int]) => s.sum)
res.foreach(s => println(s))
If you really wanted to avoid materializing the lists, and instead wanted to apply the function incrementally, the solution would be more complicated.
Upvotes: 4