Elliot
Elliot

Reputation: 2690

How to convert a List to RDD with scala and spark

I'm trying to generate an RDD from a List and another RDD using scala and spark. The idea is to take a list of values, and generate an index containing all the entries of the original dataset that contains each value. Here's the code that I'm trying

  def mcveInvertIndex(foos: List[String], bars: RDD[Int]): RDD[(String, Iterable[Int])] = {
    // filter function
    def hasVal(foo: String)(bar: Int): Boolean =
      foo.toInt == bar
    // call to sc.parallelize to ensure that an RDD is returned
    sc parallelize(
      foos map (_ match {
        case (s: String) => (s, bars filter hasVal(s))
      })
    )
  }

Unfortunately this does not compile in sbt

> compile
[info] Compiling 1 Scala source to $TARGETDIR/target/scala-2.11/classes...
[error] $TARGETDIR/src/main/scala/wikipedia/WikipediaRanking.scala:56: type mismatch;
[error]  found   : List[(String, org.apache.spark.rdd.RDD[Int])]
[error]  required: Seq[(String, Iterable[Int])]
[error] Error occurred in an application involving default arguments.
[error]       foos map (_ match {
[error]            ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 1 s, completed Mar 11, 2017 7:11:31 PM

I really don't understand the errors that I'm getting. List is a subclass of Seq, and I presume that RDDs are a subclass of Iterable. Is there something obvious that I've missed?

Upvotes: 1

Views: 3468

Answers (2)

Yuriy Gatilin
Yuriy Gatilin

Reputation: 132

Here is my solution with for-comprehension (should use less memory than cartesian product)

  def mcveInvertIndex(foos: List[String],
                      bars: RDD[Int]): RDD[(String, Iterable[Int])] = 
  {

    // filter function
    def hasVal(foo: String, bar: Int): Boolean =
      foo.toInt == bar

    // Producing RDD[(String, Iterable[Int])]
    (for {
      bar <- bars // it's important to have RDD 
                  // at first position of for-comprehesion
                  // to produce the correct result type
      foo <- foos
      if hasVal(foo, bar)
    } yield (foo, bar)).groupByKey()
  }

Upvotes: 3

Justin Pihony
Justin Pihony

Reputation: 67075

As mentioned in the comment, RDD is not an Iterable, so you have to combine the two in some way and then aggregate them. This is my quick solution, although there might be a more efficient way:

def mcveInvertIndex(foos: List[String], bars: RDD[Int]): RDD[(String, Iterable[Int])] = {
    sc.makeRDD(foos)
      .cartesian(bars)
      .keyBy(x=>x._1)
      .aggregateByKey(Iterable.empty[Int])(
        (agg: Iterable[Int], currVal: (String, Int))=>{
          if(currVal._1.toInt != currVal._2) agg
          else currVal._2 +: agg.toList
        }, 
        _ ++ _ 
     )
  }

Upvotes: 1

Related Questions