Reputation: 2690
I'm trying to generate an RDD
from a List
and another RDD
using scala
and spark
. The idea is to take a list of values, and generate an index containing all the entries of the original dataset that contains each value.
Here's the code that I'm trying
def mcveInvertIndex(foos: List[String], bars: RDD[Int]): RDD[(String, Iterable[Int])] = {
// filter function
def hasVal(foo: String)(bar: Int): Boolean =
foo.toInt == bar
// call to sc.parallelize to ensure that an RDD is returned
sc parallelize(
foos map (_ match {
case (s: String) => (s, bars filter hasVal(s))
})
)
}
Unfortunately this does not compile in sbt
> compile
[info] Compiling 1 Scala source to $TARGETDIR/target/scala-2.11/classes...
[error] $TARGETDIR/src/main/scala/wikipedia/WikipediaRanking.scala:56: type mismatch;
[error] found : List[(String, org.apache.spark.rdd.RDD[Int])]
[error] required: Seq[(String, Iterable[Int])]
[error] Error occurred in an application involving default arguments.
[error] foos map (_ match {
[error] ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 1 s, completed Mar 11, 2017 7:11:31 PM
I really don't understand the errors that I'm getting. List
is a subclass of Seq
, and I presume that RDD
s are a subclass of Iterable
. Is there something obvious that I've missed?
Upvotes: 1
Views: 3468
Reputation: 132
Here is my solution with for-comprehension (should use less memory than cartesian product)
def mcveInvertIndex(foos: List[String],
bars: RDD[Int]): RDD[(String, Iterable[Int])] =
{
// filter function
def hasVal(foo: String, bar: Int): Boolean =
foo.toInt == bar
// Producing RDD[(String, Iterable[Int])]
(for {
bar <- bars // it's important to have RDD
// at first position of for-comprehesion
// to produce the correct result type
foo <- foos
if hasVal(foo, bar)
} yield (foo, bar)).groupByKey()
}
Upvotes: 3
Reputation: 67075
As mentioned in the comment, RDD is not an Iterable, so you have to combine the two in some way and then aggregate them. This is my quick solution, although there might be a more efficient way:
def mcveInvertIndex(foos: List[String], bars: RDD[Int]): RDD[(String, Iterable[Int])] = {
sc.makeRDD(foos)
.cartesian(bars)
.keyBy(x=>x._1)
.aggregateByKey(Iterable.empty[Int])(
(agg: Iterable[Int], currVal: (String, Int))=>{
if(currVal._1.toInt != currVal._2) agg
else currVal._2 +: agg.toList
},
_ ++ _
)
}
Upvotes: 1