blue-sky
blue-sky

Reputation: 53916

Grouping data using Scala/Apache Spark

Below code groups a List of Strings into type List[(String, List[String])] Where all capitals are encountered in String of length 5, this is the identifier and all data subsequent to the identifier is grouped into a list. The terminating factor for each group is an empty line encountered. So below "lines" get converted to :

(IDENT,List(p1text, p2text))
(IDENY,List(p2text, p3text, p4text))

Is there a more idiomatic way of achieving this in Scala/Spark ? Possibly using a groupBy call with predicate ?

Ideally the data structure would be of type RDD[(String, List[String])] instead of List[(String, List[String])]

  val lines = List[String]("line1",
    "                                                                                                                       ",
    "line2",
    "                                                                                               ",
    "                   IDENT",
    "p1text",
    "p2text",
    "                                                                                               ",
    "       IDENY",
    "p2text",
    "p3text",
    "p4text",
    "                   ",
    "some text")                                  //> lines  : List[String] = List(line1, "                 
                                                  //|                                       
                                                  //|                                       
                                                  //|                   ", line2, "             
                                                  //|                                       
                                                  //|                                       
                                                  //| ", "                  IDENT", p1text, p2text, "   
                                                  //|                                       
                                                  //|                                       
                                                  //|           ", "        IDENY", p2text, p3text, p4text, "   
                                                  //|               ", some text)

  def getItems(i: Int): List[String] = {
    var iter = i;
    val l = new scala.collection.mutable.ArrayBuffer[String]()
    while (!lines(iter).trim.isEmpty) {
      iter = iter + 1
      if(!lines(iter).trim.isEmpty)
        l.append(lines(iter).trim)
    }
    l.toList
  }                                               //> getItems: (i: Int)List[String]

  val regex = "\\w{5}"                            //> regex  : String = \w{5}

  val u: List[(String , List[String])] = lines.zipWithIndex.map({
    case (s, i) => {
      if (s.trim.toUpperCase.matches(regex)) {
        (s.trim, getItems(i))
      } else {
        ("" , List())
      }
    }
  })                                              //> u  : List[(String, List[String])] = List((line1,List()), ("",List()), (line
                                                  //| 2,List()), ("",List()), (IDENT,List(p1text, p2text)), ("",List()), ("",List
                                                  //| ()), ("",List()), (IDENY,List(p2text, p3text, p4text)), ("",List()), ("",Li
                                                  //| st()), ("",List()), ("",List()), ("",List()))


  val fi : List[(String, List[String])] = u.filterNot(f => f._2.isEmpty || f._2(0).trim.isEmpty)
                                                  //> fi  : List[(String, List[String])] = List((IDENT,List(p1text, p2text)), (ID
                                                  //| ENY,List(p2text, p3text, p4text)))
  fi.foreach(println)                             //> (IDENT,List(p1text, p2text))
                                                  //| (IDENY,List(p2text, p3text, p4text))

Upvotes: 3

Views: 777

Answers (1)

Francois G
Francois G

Reputation: 11985

You can start with the idiomatic way to write your splitting in Scala : as a recursive function.

def getItems(l: List[String]): List[(String, List[String])] = {
  if (l.isEmpty) List()
  else {
    val caps = "[A-Z]+".r
    val (beg, end) = l.span(_.trim.nonEmpty)
    if (beg.nonEmpty)
      beg.head.trim match {
        case caps() => (beg.head.trim, beg.tail) :: getItems(end.drop(1))
        case _ => getItems(end.drop(1))
      }
    else
      getItems(end.tail)
  }
}

Then you can speed it up by making it a tail-recursive function.

import scala.annotation.tailrec

def getItemsFast(l: List[String]): List[(String, List[String])] = {
  @tailrec
  def getItemsAux(l: List[String], res: List[(String, List[String])]): List[(String, List[String])] = {
    if (l.isEmpty) res.reverse
    else {
      val caps = "[A-Z]+".r
      val (beg, end) = l.span(_.trim.nonEmpty)
      if (beg.nonEmpty)
        beg.head.trim match {
          case caps() => getItemsAux(end.drop(1), (beg.head.trim, beg.tail)::res)
          case _ => getItemsAux(end.drop(1), res)
        }
      else
        getItemsAux(end.tail, res)
    }
  }

  getItemsAux(l,List())
}

Then, the easy (but not correct, see below) to retrieve this in Spark if you have an RDD of lines, is to mapPartition on your RDD.

myRDDOfLines.mapPartitions(lines => {
  getItemsFast(lines)       
})

This should mostly work, but this will fail to notice records which have been partitioned such that the identifier is in one partition, but some of 'its' lines are trailing in the next partition.

The fault is in the way you build records as partitionable units : what you really want is an RDD of records (with one record being an element of the output list above, it should be clear what the key and value should be). That's not what sc.textFile gives you. There's may ways to load that data better into Spark. You can for example :

  • split your text into several files at empty lines ad use wholeTextFiles
  • implementing a custom TextInputFormat and a RecordReader
  • if you can tease out a minimum number of blank characters to use as a record separator, you can use the hadoop support for multi-line records, providing the separator through a hadoop.Configuration object ...

Upvotes: 1

Related Questions