zork
zork

Reputation: 2135

Spark: aggregateByKey into a pair of lists

I have a keyed set of records that contain book id as well as reader id fields.

case class Book(book: Int, reader: Int)

How can I use aggregateByKey to combine all records with the same key into one record of the following format:

(key:Int, (books: List:[Int], readers: List:[Int])) 

where books is a list of all books and readers is a list of all readers from records with the given key?

My code (below) results in compilation errors:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}

object Aggr {

  case class Book(book: Int, reader: Int)

  val bookArray = Array(
      (2,Book(book = 1, reader = 700)),
      (3,Book(book = 2, reader = 710)),
      (4,Book(book = 3, reader = 710)),
      (2,Book(book = 8, reader = 710)),
      (3,Book(book = 1, reader = 720)),
      (4,Book(book = 2, reader = 720)),
      (4,Book(book = 8, reader = 720)),
      (3,Book(book = 3, reader = 730)),
      (4,Book(book = 8, reader = 740))
  )

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    // set up environment
    val conf = new SparkConf()
      .setMaster("local[5]")
      .setAppName("Aggr")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)

    val books = sc.parallelize(bookArray)
    val aggr = books.aggregateByKey((List()[Int], List()[Int]))
    ({case
      ((bookList:List[Int],readerList:List[Int]), Book(book, reader)) =>
      (bookList ++ List(book), readerList ++ List(reader))
      },
    {case ((bookLst1:List[Int], readerLst1:List[Int]),
    (bookLst2:List[Int], readerLst2:List[Int])
      ) => (bookLst1 ++ bookLst2, readerLst1 ++ readerLst2) })


  }
}

Errors:

Error:(36, 44) object Nil does not take type parameters.
val aggr = books.aggregateByKey((List()[Int], List()[Int]))

Error:(37, 6) missing parameter type for expanded function The argument types of an anonymous function must be fully known. (SLS 8.5) Expected type was: ?
({case
 ^
                                       ^

Update

When initializing accumalator with (List(0), List(0) everything compiles, but inserts extra zeros into result. Very interesting:

val aggr :  RDD[(Int, (List[Int], List[Int]))] = books.aggregateByKey((List(0), List(0))) (
{case
  ((bookList:List[Int],readerList:List[Int]), Book(book, reader)) =>
  (bookList ++ List(book), readerList ++ List(reader))
  },
{case ((bookLst1:List[Int], readerLst1:List[Int]),
(bookLst2:List[Int], readerLst2:List[Int])
  ) => (bookLst1 ++ bookLst2, readerLst1 ++ readerLst2) }
)

This results in the following output:

[Stage 0:>                                                          (0 + 0) / 5](2,(List(0, 1, 0, 8),List(0, 700, 0, 710)))
(3,(List(0, 2, 0, 1, 0, 3),List(0, 710, 0, 720, 0, 730)))
(4,(List(0, 3, 0, 2, 8, 0, 8),List(0, 710, 0, 720, 720, 0, 740)))

Providing I could have empty lists as initializers instead of lists with zeros, I would not have extra zeros of course, lists would concatenate nicely.

Can somebody, please, explain me why empty list initializer (List(), List() results in error and (List(0), List(0) compiles. Is it a Scala bug or a feature?

Upvotes: 0

Views: 1965

Answers (2)

Zoltán
Zoltán

Reputation: 22156

Answering your update - you misplaced the type declaration for your lists. If you declared them as List[Int]() instead of List()[Int], everything would have worked. The compiler error message is correctly telling you the problem, but it's not quite easy to understand. By putting [Int] at the end, you are passing a type parameter to the result of the List() function. The result of List() is Nil - a singleton object representating an empty list - and it does not take type parameters.

As for why List(0) also works - scala performs type inference, if it can. You've declared one element of a list - which is 0, an integer, so it inferred that this is a List[Int]. Note however, that this does not declare an empty list, but a list with a single zero. You probably want to use List[Int]() instead.

Just using List() doesn't work because scala cannot infer the type of the empty list.

Upvotes: 0

Mateusz Dymczyk
Mateusz Dymczyk

Reputation: 15141

Actually you're doing everything OK, it's only that your indentation/syntax style is a bit sloppy, you just need to move one parenthesis from this:

val aggr = books.aggregateByKey((List()[Int], List()[Int]))
({case

Into this:

val aggr = books.aggregateByKey((List[Int](), List[Int]())) (
    {case

These links might shed some light why this didn't work for you:

What are the precise rules for when you can omit parenthesis, dots, braces, = (functions), etc.? (first answer)

http://docs.scala-lang.org/style/method-invocation.html#suffix-notation

Upvotes: 2

Related Questions