Reputation: 860
Problem
The input data has 2 types of records, lets call them R
and W
.
I need to traverse this data in Sequence from top to bottom in such a way that if the current record is of type W
, it has to be merged with a map(lets call it workMap
). If the key of that W-type record is already present in the map, the value of this record is added to it, otherwise a new entry is made into workMap
.
If the current record is of type R
, the workMap
calculated until this record, is attached to the current record.
For example, if this is the order of records -
W1- a -> 2
W2- b -> 3
W3- a -> 4
R1
W4- c -> 1
R2
W5- c -> 4
Where W1, W2, W3, W4 and W5 are of type W
; And R1 and R2 are of type R
At the end of this function, I should have the following -
R1 - { a -> 6,
b -> 3 } //merged(W1, W2, W3)
R2 - { a -> 6,
b -> 3,
c -> 1 } //merged(W1, W2, W3, W4)
{ a -> 6,
b -> 3,
c -> 5 } //merged(W1, W2, W3, W4, W5)
I want all the R-type records attached to the intermediate workMap
s calculated until that point; And the final workMap
after the last record is processed.
Here is the code that I have written -
def calcPerPartition(itr: Iterator[(InputKey, InputVal)]):
Iterator[(ReportKey, ReportVal)] = {
val workMap = mutable.HashMap.empty[WorkKey, WorkVal]
val reportList = mutable.ArrayBuffer.empty[(ReportKey, Reportval)]
while (itr.hasNext) {
val temp = itr.next()
val (iKey, iVal) = (temp._1, temp._2)
if (iKey.recordType == reportType) {
//creates a new (ReportKey, Reportval)
reportList += getNewReportRecord(workMap, iKey, iVal)
}
else {
//if iKey is already present, merge the values
//other wise adds a new entry
updateWorkMap(workMap, iKey, iVal)
}
}
val workList: Seq[(ReportKey, ReportVal)] = workMap.toList.map(convertToReport)
reportList.iterator ++ workList.iterator
}
ReportKey
class is like this -
case class ReportKey (
// the type of record - report or work
rType: Int,
date: String,
.....
)
There are two problems with this approach that I am asking help for -
reportList
- a list of R
type records attached with intermediate workMap
s. As the data grows, the reportList
also grows and I am running into OutOfMemoryException
s. reportList
and workMap
records in the same data structure and then return them. If there is any other elegant way, I would definitely consider changing this design. For the sake of completeness - I am using spark. The function calcPerPartition
is passed as argument for mapPartitions on an RDD. I need the workMap
s from each partition to do some additional calculations later.
I know that if I don't have to return workMap
s from each partition, the problem becomes much easier, like this -
...
val workMap = mutable.HashMap.empty[WorkKey, WorkVal]
itr.scanLeft[Option[(ReportKey, Reportval)]](
None)((acc: Option[(ReportKey, Reportval)],
curr: (InputKey, InputVal)) => {
if (curr._1.recordType == reportType) {
val rec = getNewReportRecord(workMap, curr._1, curr._2)
Some(rec)
}
else {
updateWorkMap(workMap, curr._1, curr._2)
None
}
})
val reportList = scan.filter(_.isDefined).map(_.get)
//workMap is still empty after the scanLeft.
...
Sure, I can do a reduce
operation on the input data to derive the final workMap
but I would need to look at the data twice. Considering that the input data set is huge, I want to avoid that too.
But unfortunately I need the workMap
s at a latter step.
So, is there a better way to solve the above problem? If I can't solve problem 2 at all(according to this), is there any other way I can avoid storing R
records(reportList
) in a list or scan the data more than once?
Upvotes: 1
Views: 219
Reputation: 860
I don't yet have a better design for the second question - if you can avoid combining reportList
and workMap
into a single data structure but we can certainly avoid storing R
type records in a list.
Here is how we can re-write the calcPerPartition
from the above question -
def calcPerPartition(itr: Iterator[(InputKey, InputVal)]):
Iterator[Option[(ReportKey, ReportVal)]] = {
val workMap = mutable.HashMap.empty[WorkKey, WorkVal]
var finalWorkMap = true
new Iterator[Option[(ReportKey, ReportVal)]](){
override def hasNext: Boolean = itr.hasNext
override def next(): Option[(ReportKey, ReportVal)] = {
val curr = itr.next()
val iKey = curr._1
val iVal = curr._2
val eventKey = EventKey(openKey.date, openKey.symbol)
if (iKey.recordType == reportType) {
Some(getNewReportRecord(workMap, iKey, iVal))
}
else {
//otherwise update the generic interest map but don't accumulate anything
updateWorkMap(workMap, iKey, iVal)
if (itr.hasNext) {
next()
}
else {
if(finalWorkMap){
finalWorkMap = false //because we want a final only once
Some(workMap.map(convertToReport))
}
else {
None
}
}
}
}
}
}
Instead of storing results in a list, we defined an iterator. That solved most of the memory issues we had around this issue.
Upvotes: 0