Reputation: 929
I had a file that contained a list of elements like this
00|905000|20160125204123|79644809999||HGMTC|1||22|7905000|56321647569|||34110|I||||||250995210056537|354805064211510||56191|||38704||A|||11|V|81079681404134|5||||SE|||G|144|||||||||||||||Y|b00534589.huawei_anadyr.20151231184912||1|||||79681404134|0|||+@@+1{79098509982}2{2}3{2}5{79644809999}6{0000002A7A5AC635}7{79681404134}|20160125|
Through a series of steps, I managed to convert it to a list of elements like this
(902996760100000,CompactBuffer(6, 5, 2, 2, 8, 6, 5, 3))
Where 905000 and 902996760100000 are keys and 6, 5, 2, 2, 8, 6, 5, 3 are values. Values can be numbers from 1 to 8. Are there any ways to count number of occurences of these values using spark, so the result looks like this?
(902996760100000, 0_1, 2_2, 1_3, 0_4, 2_5, 2_6, 0_7, 1_8)
I could do it with if else blocks and staff, but that won't be pretty, so I wondered if there are any instrumets I could use in scala/spark.
This is my code.
class ScalaJob(sc: SparkContext) {
def run(cdrPath: String) : RDD[(String, Iterable[String])] = {
//pass the file
val fileCdr = sc.textFile(cdrPath);
//find values in every raw cdr
val valuesCdr = fileCdr.map{
dataRaw =>
val p = dataRaw.split("[|]",-1)
(p(1), ScalaJob.processType(ScalaJob.processTime(p(2)) + "_" + p(32)))
}
val x = valuesCdr.groupByKey()
return x
}
Any advice on optimizing it would be appreciated. I'm really new to scala/spark.
Upvotes: 0
Views: 2747
Reputation: 37852
First, Scala is a type-safe language and so is Spark's RDD API - so it's highly recommended to use the type system instead of going around it by "encoding" everything into Strings.
So I'll suggest a solution that creates an RDD[(String, Seq[(Int, Int)])]
(with second item in tuple being a sequence of (ID, count) tuples) and not a RDD[(String, Iterable[String])]
which seems less useful.
Here's a simple function that counts the occurrences of 1 to 8 in a given Iterable[Int]
:
def countValues(l: Iterable[Int]): Seq[(Int, Int)] = {
(1 to 8).map(i => (i, l.count(_ == i)))
}
You can use mapValues
with this function (place the function in the object for serializability, like you did with the rest) on an RDD[(String, Iterable[Int])]
to get the result:
valuesCdr.groupByKey().mapValues(ScalaJob.countValues)
The entire solution can then be simplified a bit:
class ScalaJob(sc: SparkContext) {
import ScalaJob._
def run(cdrPath: String): RDD[(String, Seq[(Int, Int)])] = {
val valuesCdr = sc.textFile(cdrPath)
.map(_.split("\\|"))
.map(p => (p(1), processType(processTime(p(2)), p(32))))
valuesCdr.groupByKey().mapValues(countValues)
}
}
object ScalaJob {
val dayParts = Map((6 to 11) -> 1, (12 to 18) -> 2, (19 to 23) -> 3, (0 to 5) -> 4)
def processTime(s: String): Int = {
val hour = DateTime.parse(s, DateTimeFormat.forPattern("yyyyMMddHHmmss")).getHourOfDay
dayParts.filterKeys(_.contains(hour)).values.head
}
def processType(dayPart: Int, s: String): Int = s match {
case "S" => 2 * dayPart - 1
case "V" => 2 * dayPart
}
def countValues(l: Iterable[Int]): Seq[(Int, Int)] = {
(1 to 8).map(i => (i, l.count(_ == i)))
}
}
Upvotes: 1