Reputation: 2135
I have a keyed set of records that contain book id as well as reader id fields.
case class Book(book: Int, reader: Int)
How can I use aggregateByKey
to combine all records with the same key into one record of the following format:
(key:Int, (books: List:[Int], readers: List:[Int]))
where books is a list of all books and readers is a list of all readers from records with the given key?
My code (below) results in compilation errors:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
object Aggr {
case class Book(book: Int, reader: Int)
val bookArray = Array(
(2,Book(book = 1, reader = 700)),
(3,Book(book = 2, reader = 710)),
(4,Book(book = 3, reader = 710)),
(2,Book(book = 8, reader = 710)),
(3,Book(book = 1, reader = 720)),
(4,Book(book = 2, reader = 720)),
(4,Book(book = 8, reader = 720)),
(3,Book(book = 3, reader = 730)),
(4,Book(book = 8, reader = 740))
)
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf = new SparkConf()
.setMaster("local[5]")
.setAppName("Aggr")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val books = sc.parallelize(bookArray)
val aggr = books.aggregateByKey((List()[Int], List()[Int]))
({case
((bookList:List[Int],readerList:List[Int]), Book(book, reader)) =>
(bookList ++ List(book), readerList ++ List(reader))
},
{case ((bookLst1:List[Int], readerLst1:List[Int]),
(bookLst2:List[Int], readerLst2:List[Int])
) => (bookLst1 ++ bookLst2, readerLst1 ++ readerLst2) })
}
}
Errors:
Error:(36, 44) object Nil does not take type parameters.
val aggr = books.aggregateByKey((List()[Int], List()[Int]))
Error:(37, 6) missing parameter type for expanded function The argument types of an anonymous function must be fully known. (SLS 8.5) Expected type was: ?
({case
^
^
Update
When initializing accumalator with (List(0), List(0)
everything compiles, but inserts extra zeros into result. Very interesting:
val aggr : RDD[(Int, (List[Int], List[Int]))] = books.aggregateByKey((List(0), List(0))) (
{case
((bookList:List[Int],readerList:List[Int]), Book(book, reader)) =>
(bookList ++ List(book), readerList ++ List(reader))
},
{case ((bookLst1:List[Int], readerLst1:List[Int]),
(bookLst2:List[Int], readerLst2:List[Int])
) => (bookLst1 ++ bookLst2, readerLst1 ++ readerLst2) }
)
This results in the following output:
[Stage 0:> (0 + 0) / 5](2,(List(0, 1, 0, 8),List(0, 700, 0, 710)))
(3,(List(0, 2, 0, 1, 0, 3),List(0, 710, 0, 720, 0, 730)))
(4,(List(0, 3, 0, 2, 8, 0, 8),List(0, 710, 0, 720, 720, 0, 740)))
Providing I could have empty lists as initializers instead of lists with zeros, I would not have extra zeros of course, lists would concatenate nicely.
Can somebody, please, explain me why empty list initializer (List(), List()
results in error and (List(0), List(0)
compiles. Is it a Scala bug or a feature?
Upvotes: 0
Views: 1965
Reputation: 22156
Answering your update - you misplaced the type declaration for your lists. If you declared them as List[Int]()
instead of List()[Int]
, everything would have worked. The compiler error message is correctly telling you the problem, but it's not quite easy to understand. By putting [Int]
at the end, you are passing a type parameter to the result of the List()
function. The result of List()
is Nil
- a singleton object representating an empty list - and it does not take type parameters.
As for why List(0)
also works - scala performs type inference, if it can. You've declared one element of a list - which is 0, an integer, so it inferred that this is a List[Int]
. Note however, that this does not declare an empty list, but a list with a single zero. You probably want to use List[Int]()
instead.
Just using List()
doesn't work because scala cannot infer the type of the empty list.
Upvotes: 0
Reputation: 15141
Actually you're doing everything OK, it's only that your indentation/syntax style is a bit sloppy, you just need to move one parenthesis from this:
val aggr = books.aggregateByKey((List()[Int], List()[Int]))
({case
Into this:
val aggr = books.aggregateByKey((List[Int](), List[Int]())) (
{case
These links might shed some light why this didn't work for you:
What are the precise rules for when you can omit parenthesis, dots, braces, = (functions), etc.? (first answer)
http://docs.scala-lang.org/style/method-invocation.html#suffix-notation
Upvotes: 2