Reputation: 18830
I have following code where I maintain a large List: What I do here is go over the data stream and create an inverted index. I use twitter scalding api and dataTypePipe is type of TypedPipe
lazy val cats = dataTypePipe.cross(cmsCats)
.map(vf => (vf._1.itemId, vf._1.leafCats, vf._2))
.flatMap {
case (id, categorySet, cHhitters) => categorySet.map(cat => (
...
}
.filter(f => f._2.nonEmpty)
.group.withReducers(4000)
.sum
.map {
case ((token,bucket), ids) =>
toIndexedRecord(ids, token, bucket)
}
Due to a serialization issue I convert scala list to java list and use avro to write:
def toIndexedRecord(ids: List[Long], token: String, bucket: Int): IndexRecord = {
val javaList = ids.map(l => l: java.lang.Long).asJava //need to convert from scala long to java long
new IndexRecord(token, bucket,javaList)
}
But the issue is large number of information keeping in list cause Java Heap issue. I believe summing is also a contributor to this issue
2013-08-25 16:41:09,709 WARN org.apache.hadoop.mapred.Child: Error running child
cascading.pipe.OperatorException: [_pipe_0*_pipe_1][com.twitter.scalding.GroupBuilder$$anonfun$1.apply(GroupBuilder.scala:189)] operator Every failed executing operation: MRMAggregator[decl:'value']
at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:136)
at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:39)
at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:49)
at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28)
at cascading.flow.hadoop.stream.HadoopGroupGate.run(HadoopGroupGate.java:90)
at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:133)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:522)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:421)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1232)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.OutOfMemoryError: Java heap space
at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
at scala.collection.immutable.List.$colon$colon$colon(List.scala:127)
at scala.collection.immutable.List.$plus$plus(List.scala:193)
at com.twitter.algebird.ListMonoid.plus(Monoid.scala:86)
at com.twitter.algebird.ListMonoid.plus(Monoid.scala:84)
at com.twitter.scalding.KeyedList$$anonfun$sum$1.apply(TypedPipe.scala:264)
at com.twitter.scalding.MRMAggregator.aggregate(Operations.scala:279)
at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:128)
So my question is what can I do to avoid this situation.
Upvotes: 0
Views: 818
Reputation: 1974
Try .forceToReducers before the .sum. This OOM is happening map side as we are caching values. That may not help in your case.
If the lists are truly too large however, there is really very little that can be done.
Upvotes: 3
Reputation: 31533
Quick, but unscalable answer: try increasing mapred.child.java.opts
Better answer, well it's a little tricky to understand the question because I don't know the types of your vals and I don't know what f
are vf
because you haven't named them informatively. If you provide the minimal amount of code required so I can paste into an IDE and have a play around then I might find your problem.
sum
might be where the OOM happens, but it is not what is causing it - refactoring to do sum in different way won't help.
Chances are your crossing on something too big to fit in memory. So mapred.child.java.opts
might be only solution for you unless you completely restructure your data. Note cross
calls crossWithTiny
, now tiny means tiny :)
Upvotes: 1