Reputation: 53916
Below code groups a List of Strings into type List[(String, List[String])] Where all capitals are encountered in String of length 5, this is the identifier and all data subsequent to the identifier is grouped into a list. The terminating factor for each group is an empty line encountered. So below "lines" get converted to :
(IDENT,List(p1text, p2text))
(IDENY,List(p2text, p3text, p4text))
Is there a more idiomatic way of achieving this in Scala/Spark ?
Possibly using a groupBy
call with predicate ?
Ideally the data structure would be of type RDD[(String, List[String])] instead of List[(String, List[String])]
val lines = List[String]("line1",
" ",
"line2",
" ",
" IDENT",
"p1text",
"p2text",
" ",
" IDENY",
"p2text",
"p3text",
"p4text",
" ",
"some text") //> lines : List[String] = List(line1, "
//|
//|
//| ", line2, "
//|
//|
//| ", " IDENT", p1text, p2text, "
//|
//|
//| ", " IDENY", p2text, p3text, p4text, "
//| ", some text)
def getItems(i: Int): List[String] = {
var iter = i;
val l = new scala.collection.mutable.ArrayBuffer[String]()
while (!lines(iter).trim.isEmpty) {
iter = iter + 1
if(!lines(iter).trim.isEmpty)
l.append(lines(iter).trim)
}
l.toList
} //> getItems: (i: Int)List[String]
val regex = "\\w{5}" //> regex : String = \w{5}
val u: List[(String , List[String])] = lines.zipWithIndex.map({
case (s, i) => {
if (s.trim.toUpperCase.matches(regex)) {
(s.trim, getItems(i))
} else {
("" , List())
}
}
}) //> u : List[(String, List[String])] = List((line1,List()), ("",List()), (line
//| 2,List()), ("",List()), (IDENT,List(p1text, p2text)), ("",List()), ("",List
//| ()), ("",List()), (IDENY,List(p2text, p3text, p4text)), ("",List()), ("",Li
//| st()), ("",List()), ("",List()), ("",List()))
val fi : List[(String, List[String])] = u.filterNot(f => f._2.isEmpty || f._2(0).trim.isEmpty)
//> fi : List[(String, List[String])] = List((IDENT,List(p1text, p2text)), (ID
//| ENY,List(p2text, p3text, p4text)))
fi.foreach(println) //> (IDENT,List(p1text, p2text))
//| (IDENY,List(p2text, p3text, p4text))
Upvotes: 3
Views: 777
Reputation: 11985
You can start with the idiomatic way to write your splitting in Scala : as a recursive function.
def getItems(l: List[String]): List[(String, List[String])] = {
if (l.isEmpty) List()
else {
val caps = "[A-Z]+".r
val (beg, end) = l.span(_.trim.nonEmpty)
if (beg.nonEmpty)
beg.head.trim match {
case caps() => (beg.head.trim, beg.tail) :: getItems(end.drop(1))
case _ => getItems(end.drop(1))
}
else
getItems(end.tail)
}
}
Then you can speed it up by making it a tail-recursive function.
import scala.annotation.tailrec
def getItemsFast(l: List[String]): List[(String, List[String])] = {
@tailrec
def getItemsAux(l: List[String], res: List[(String, List[String])]): List[(String, List[String])] = {
if (l.isEmpty) res.reverse
else {
val caps = "[A-Z]+".r
val (beg, end) = l.span(_.trim.nonEmpty)
if (beg.nonEmpty)
beg.head.trim match {
case caps() => getItemsAux(end.drop(1), (beg.head.trim, beg.tail)::res)
case _ => getItemsAux(end.drop(1), res)
}
else
getItemsAux(end.tail, res)
}
}
getItemsAux(l,List())
}
Then, the easy (but not correct, see below) to retrieve this in Spark if you have an RDD of lines, is to mapPartition
on your RDD.
myRDDOfLines.mapPartitions(lines => {
getItemsFast(lines)
})
This should mostly work, but this will fail to notice records which have been partitioned such that the identifier is in one partition, but some of 'its' lines are trailing in the next partition.
The fault is in the way you build records as partitionable units : what you really want is an RDD of records (with one record being an element of the output list above, it should be clear what the key and value should be). That's not what sc.textFile
gives you. There's may ways to load that data better into Spark. You can for example :
wholeTextFiles
TextInputFormat
and a RecordReader
hadoop.Configuration
object ...Upvotes: 1