Phani Rahul
Phani Rahul

Reputation: 860

Better way to scan data using scala and spark

Problem

The input data has 2 types of records, lets call them R and W.

I need to traverse this data in Sequence from top to bottom in such a way that if the current record is of type W, it has to be merged with a map(lets call it workMap). If the key of that W-type record is already present in the map, the value of this record is added to it, otherwise a new entry is made into workMap.

If the current record is of type R, the workMap calculated until this record, is attached to the current record.

For example, if this is the order of records -

W1-   a -> 2
W2-   b -> 3
W3-   a -> 4
R1 
W4-   c -> 1
R2
W5-   c -> 4

Where W1, W2, W3, W4 and W5 are of type W; And R1 and R2 are of type R

At the end of this function, I should have the following -

R1 - { a -> 6, 
       b -> 3 } //merged(W1, W2, W3)
R2 - { a -> 6, 
       b -> 3,
       c -> 1 } //merged(W1, W2, W3, W4)
{ a -> 6, 
  b -> 3,
  c -> 5 } //merged(W1, W2, W3, W4, W5)

I want all the R-type records attached to the intermediate workMaps calculated until that point; And the final workMap after the last record is processed.

Here is the code that I have written -

def calcPerPartition(itr: Iterator[(InputKey, InputVal)]):
  Iterator[(ReportKey, ReportVal)] = {

    val workMap = mutable.HashMap.empty[WorkKey, WorkVal]
    val reportList = mutable.ArrayBuffer.empty[(ReportKey, Reportval)]

    while (itr.hasNext) {
      val temp = itr.next()
      val (iKey, iVal) = (temp._1, temp._2)

      if (iKey.recordType == reportType) {
       //creates a new (ReportKey, Reportval)
        reportList += getNewReportRecord(workMap, iKey, iVal) 
      }
      else {
        //if iKey is already present, merge the values 
        //other wise adds a new entry
        updateWorkMap(workMap, iKey, iVal) 
      }
    }
    val workList: Seq[(ReportKey, ReportVal)] = workMap.toList.map(convertToReport)

    reportList.iterator ++ workList.iterator
  }

ReportKey class is like this -

case class ReportKey (
                        // the type of record - report or work 
                        rType: Int, 
                        date: String, 
                      .....
                       )

There are two problems with this approach that I am asking help for -

  1. I have to keep track of a reportList - a list of R type records attached with intermediate workMaps. As the data grows, the reportList also grows and I am running into OutOfMemoryExceptions.
  2. I have to combine reportList and workMap records in the same data structure and then return them. If there is any other elegant way, I would definitely consider changing this design.

For the sake of completeness - I am using spark. The function calcPerPartition is passed as argument for mapPartitions on an RDD. I need the workMaps from each partition to do some additional calculations later.

I know that if I don't have to return workMaps from each partition, the problem becomes much easier, like this -

...
val workMap = mutable.HashMap.empty[WorkKey, WorkVal]                     
itr.scanLeft[Option[(ReportKey, Reportval)]](
  None)((acc: Option[(ReportKey, Reportval)], 
  curr: (InputKey, InputVal)) => {

  if (curr._1.recordType == reportType) {
    val rec = getNewReportRecord(workMap, curr._1, curr._2)
    Some(rec)
  }
  else {
    updateWorkMap(workMap, curr._1, curr._2)
    None
  }
})

val reportList = scan.filter(_.isDefined).map(_.get)
//workMap is still empty after the scanLeft. 
... 

Sure, I can do a reduce operation on the input data to derive the final workMap but I would need to look at the data twice. Considering that the input data set is huge, I want to avoid that too.

But unfortunately I need the workMaps at a latter step.

So, is there a better way to solve the above problem? If I can't solve problem 2 at all(according to this), is there any other way I can avoid storing R records(reportList) in a list or scan the data more than once?

Upvotes: 1

Views: 219

Answers (1)

Phani Rahul
Phani Rahul

Reputation: 860

I don't yet have a better design for the second question - if you can avoid combining reportList and workMap into a single data structure but we can certainly avoid storing R type records in a list.

Here is how we can re-write the calcPerPartition from the above question -

def calcPerPartition(itr: Iterator[(InputKey, InputVal)]):
  Iterator[Option[(ReportKey, ReportVal)]] = {

    val workMap = mutable.HashMap.empty[WorkKey, WorkVal]
    var finalWorkMap = true

    new Iterator[Option[(ReportKey, ReportVal)]](){
        override def hasNext: Boolean = itr.hasNext

        override def next(): Option[(ReportKey, ReportVal)] = {
            val curr = itr.next()
            val iKey = curr._1
            val iVal = curr._2
            val eventKey = EventKey(openKey.date, openKey.symbol)

            if (iKey.recordType == reportType) {
              Some(getNewReportRecord(workMap, iKey, iVal))
            }
            else {
              //otherwise update the generic interest map but don't accumulate anything
              updateWorkMap(workMap, iKey, iVal)
              if (itr.hasNext) {
                next()
              }
              else {
                  if(finalWorkMap){
                    finalWorkMap = false //because we want a final only once
                    Some(workMap.map(convertToReport))
                  }
                  else {
                    None
                  }

              }
            }
        }
    }
  }

Instead of storing results in a list, we defined an iterator. That solved most of the memory issues we had around this issue.

Upvotes: 0

Related Questions