Reputation: 2074
So the title of this should be confusing enough so I will do my best to explain. I am trying to break this function up into defined functions for better visibility into how the aggregateByKey works for other teams that will be writing to my code. I have the following aggregate:
val firstLetter = stringRDD.aggregateByKey(Map[Char, Int]())(
(accumCount, value) => accumCount.get(value.head) match {
case None => accumCount + (value.head -> 1)
case Some(count) => accumCount + (value.head -> (count + 1))
},
(accum1, accum2) => accum1 ++ accum2.map{case(k,v) => k -> (v + accum1.getOrElse(k, 0))}
).collect()
I've been wanting to break this up as follows:
val firstLet = Map[Char, Int]
def fSeq(accumCount:?, value:?) = {
accumCount.get(value.head) match {
case None => accumCount + (value.head -> 1)
case Some(count) => accumCount + (value.head -> (count + 1))
}
}
def fComb(accum1:?, accum2:?) = {
accum1 ++ accum2.map{case(k,v) => k -> (v + accum1.getOrElse(k, 0))
}
Due to the initial value being a Map[Char, Int] I am not sure what to make accumCount, Value data types to define. I've tried different things but nothing seeems to work. Can someone help me define the datatypes and explain how you determined it?
Upvotes: 0
Views: 1068
Reputation: 330413
seqOp
takes accumulator of the same type as the initial value as the first argument, and value of the same type as values in your RDD
.combOp
takes two accumulators of the same types the initial value.Assuming you want to aggregate RDD[(T,U)]
:
def fSeq(accumCount: Map[Char, Int], value: U): Map[Char, Int] = ???
def fComb(accum1: Map[Char, Int], accum2: Map[Char, Int]): Map[Char, Int] = ???
I guess in your case U
is simply as String
, so you should adjust fSeq
signature.
BTW, you can use provide default mapping and simplify your functions:
val firstLet = Map[Char, Int]().withDefault(x => 0)
def fSeq(accumCount: Map[Char, Int], value: String): Map[Char, Int] = {
accumCount + (value.head -> (accumCount(value.head) + 1))
}
def fComb(accum1: Map[Char, Int], accum2: Map[Char, Int]): Map[Char, Int] = {
val accum = (accum1.keys ++ accum2.keys).map(k => (k, accum1(k) + accum2(k)))
accum.toMap.withDefault(x => 0)
}
Finally it could be more efficient to use scala.collection.mutable.Map
:
import scala.collection.mutable.{Map => MMap}
def firstLetM = MMap[Char, Int]().withDefault(x => 0)
def fSeqM(accumCount: MMap[Char, Int], value: String): MMap[Char, Int] = {
accumCount += (value.head -> (accumCount(value.head) + 1))
}
def fCombM(accum1: MMap[Char, Int], accum2: MMap[Char, Int]): MMap[Char, Int] = {
accum2.foreach{case (k, v) => accum1 += (k -> (accum1(k) + v))}
accum1
}
Test:
def randomChar() = (scala.util.Random.nextInt.abs % 58 + 65).toChar
def randomString() = {
(Seq(randomChar) ++ Iterator.iterate(randomChar)(_ => randomChar)
.takeWhile(_ => scala.util.Random.nextFloat > 0.1)).mkString
}
val stringRdd = sc.parallelize(
(1 to 500000).map(_ => (scala.util.Random.nextInt.abs % 60, randomString)))
val firstLetter = stringRDD.aggregateByKey(Map[Char, Int]())(
(accumCount, value) => accumCount.get(value.head) match {
case None => accumCount + (value.head -> 1)
case Some(count) => accumCount + (value.head -> (count + 1))
},
(accum1, accum2) => accum1 ++ accum2.map{
case(k,v) => k -> (v + accum1.getOrElse(k, 0))}
).collectAsMap()
val firstLetter2 = stringRDD
.aggregateByKey(firstLet)(fSeq, fComb)
.collectAsMap
val firstLetter3 = stringRDD
.aggregateByKey(firstLetM)(fSeqM, fCombM)
.mapValues(_.toMap)
.collectAsMap
firstLetter == val firstLetter2
firstLetter == val firstLetter3
Upvotes: 1