Peter Krauss
Peter Krauss

Reputation: 13930

How to avoid OutOfMemoryError in a small ArrayBuffer in a scope of small function?

The function scanFolder() was running, but sometimes exception bellow is produced

object MyClass{
    // ... etc
    val fs = FileSystem.get(new Configuration())
    // .. etc
    val dbs = scanFolder(warehouse)
    val dbs_prod = dbs.filter( s => db_regex.findFirstIn(s).isDefined )
    for (db <- dbs_prod)
      for (t <- scanFolder(db) {
            var parts = 0; var size = 0L
            fs.listStatus( new Path(t) ).foreach( p => {
                parts = parts + 1
                try { // can lost partition-file during loop, producing "file not found"
                    size  = size  + fs.getContentSummary(p.getPath).getLength
                } catch { case _: Throwable => }
            }) // p loop, partitions
            allVals.append(  s"('${cutPath(db)}','${cutPath(t)}',${parts},${size},'$dbHoje')"  )
            if (trStrange_count>0) contaEstranhos += 1
      }

    def scanFolder(thePath: String, toCut: Boolean = false) : ArrayBuffer[String] = {
        val lst = ArrayBuffer[String]()
        fs.listStatus( new Path(thePath) ).foreach(
            x => lst.append(  cutPath(x.getPath.toString,toCut)  )
        )
        lst.sorted
    }
}

The error:

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.util.Arrays.copyOf(Arrays.java:3332)
        at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
        at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
        at java.lang.StringBuilder.append(StringBuilder.java:136)
        at org.apache.hadoop.fs.Path.<init>(Path.java:109)
        at org.apache.hadoop.fs.Path.<init>(Path.java:93)
        ...

(edit after more tests)

I am using Scala v2.11 (and save data after loops using Spark v2.2).

I changend the code, elimitating all use of function scanFolder() calls, to avoid use of ArrayBuffer. Now is using directally the iterator fs.listStatus( new Path(x) ).foreach( ...code... ) in the second loop.

... The program run during ~30 minutes... during some messages:

Exception in thread "LeaseRenewer:spdq@TLVBRPRDK" java.lang.OutOfMemoryError: Java heap space
Exception in thread "refresh progress" java.lang.OutOfMemoryError: Java heap space
Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
Exception in thread "dispatcher-event-loop-23" java.lang.OutOfMemoryError: Java heap space
Exception in thread "dispatcher-event-loop-39" java.lang.OutOfMemoryError: Java heap space

Final error message, stoping the program:

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "SparkListenerBus"
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
19/10/16 18:42:14 WARN DefaultPromise: An exception was thrown by org.apache.spark.network.server.TransportRequestHandler$$Lambda$16/773004452.operationComplete()
java.lang.OutOfMemoryError: Java heap space
19/10/16 18:42:14 WARN DefaultChannelPipeline: An exception 'java.lang.OutOfMemoryError: Java heap space' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
java.lang.OutOfMemoryError: Java heap space

Upvotes: 0

Views: 1464

Answers (2)

Peter Krauss
Peter Krauss

Reputation: 13930

After @SergeyRomanovsky good clues I solved the problem... Here a fragment that I used first with Spark-Shell (sshell on my terminal),

  1. Add memory by most popular directives, sshell --driver-memory 12G --executor-memory 24G

  2. Remove the most internal (and problematic) loop, reducing int to parts = fs.listStatus( new Path(t) ).length and enclosing it into a try directive.

  3. Adding more one try directive to run the most internal loop after the success of the try .length.

  4. Reduced the ArrayBuffer[] variables to a minimal, removing the old scanFolder().

Complete fragment:

// ... val allVals = ArrayBuffer[String]()
// for loop:
      var parts = -1
      var size = -1L
      try { // can lost partition-file during loop, producing "file not found"
          val pp = fs.listStatus( new Path(t) )
          parts = pp.length
          pp.foreach( p => {
              try { // timeOut and other filesystem bugs
                  size  = size  + fs.getContentSummary(p.getPath).getLength
              } catch { case _: Throwable => }
          }) // p loop, partitions
      } catch { case _: Throwable => }
      allVals.append(  s"('${dbCut}','${cutPath(t)}',$parts,$size,'$dbHoje')"  )

PS: for item 1 on compiler, use SparkSession.builder()... .config("spark.executor.memory", "24G").config("spark.driver.memory", "12G")

Upvotes: 0

Sergey Romanovsky
Sergey Romanovsky

Reputation: 4554

I don't have the answer to your specific question. But I'd like to give some recommendations which could be helpful:

  • prefer stream style of processing, i.e. avoid loops. Use map, filter, fold, etc. Don't use global state like lst and allVars in your code snippet.
  • avoid unnecessary sorting. scanFolder ends with sorting without a clean reason.
  • consider adding more heap memory for your task, this will reduce GC pressure.
  • use a JVM profiler to narrow down a specific piece of code greedy for memory allocations. In your case you won't need that, as it pretty obvious. But in trickier cases it could help a lot.

Upvotes: 1

Related Questions