Reputation: 13930
The function scanFolder()
was running, but sometimes exception bellow is produced
object MyClass{
// ... etc
val fs = FileSystem.get(new Configuration())
// .. etc
val dbs = scanFolder(warehouse)
val dbs_prod = dbs.filter( s => db_regex.findFirstIn(s).isDefined )
for (db <- dbs_prod)
for (t <- scanFolder(db) {
var parts = 0; var size = 0L
fs.listStatus( new Path(t) ).foreach( p => {
parts = parts + 1
try { // can lost partition-file during loop, producing "file not found"
size = size + fs.getContentSummary(p.getPath).getLength
} catch { case _: Throwable => }
}) // p loop, partitions
allVals.append( s"('${cutPath(db)}','${cutPath(t)}',${parts},${size},'$dbHoje')" )
if (trStrange_count>0) contaEstranhos += 1
}
def scanFolder(thePath: String, toCut: Boolean = false) : ArrayBuffer[String] = {
val lst = ArrayBuffer[String]()
fs.listStatus( new Path(thePath) ).foreach(
x => lst.append( cutPath(x.getPath.toString,toCut) )
)
lst.sorted
}
}
The error:
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Arrays.copyOf(Arrays.java:3332)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at org.apache.hadoop.fs.Path.<init>(Path.java:109)
at org.apache.hadoop.fs.Path.<init>(Path.java:93)
...
I am using Scala v2.11 (and save data after loops using Spark v2.2).
I changend the code, elimitating all use of function scanFolder()
calls, to avoid use of ArrayBuffer. Now is using directally the iterator fs.listStatus( new Path(x) ).foreach( ...code... )
in the second loop.
... The program run during ~30 minutes... during some messages:
Exception in thread "LeaseRenewer:spdq@TLVBRPRDK" java.lang.OutOfMemoryError: Java heap space
Exception in thread "refresh progress" java.lang.OutOfMemoryError: Java heap space
Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
Exception in thread "dispatcher-event-loop-23" java.lang.OutOfMemoryError: Java heap space
Exception in thread "dispatcher-event-loop-39" java.lang.OutOfMemoryError: Java heap space
Final error message, stoping the program:
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "SparkListenerBus"
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
19/10/16 18:42:14 WARN DefaultPromise: An exception was thrown by org.apache.spark.network.server.TransportRequestHandler$$Lambda$16/773004452.operationComplete()
java.lang.OutOfMemoryError: Java heap space
19/10/16 18:42:14 WARN DefaultChannelPipeline: An exception 'java.lang.OutOfMemoryError: Java heap space' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
java.lang.OutOfMemoryError: Java heap space
Upvotes: 0
Views: 1464
Reputation: 13930
After @SergeyRomanovsky good clues I solved the problem... Here a fragment that I used first with Spark-Shell (sshell
on my terminal),
Add memory by most popular directives, sshell --driver-memory 12G --executor-memory 24G
Remove the most internal (and problematic) loop, reducing int to parts = fs.listStatus( new Path(t) ).length
and enclosing it into a try
directive.
Adding more one try
directive to run the most internal loop after the success of the try .length
.
Reduced the ArrayBuffer[]
variables to a minimal, removing the old scanFolder()
.
Complete fragment:
// ... val allVals = ArrayBuffer[String]()
// for loop:
var parts = -1
var size = -1L
try { // can lost partition-file during loop, producing "file not found"
val pp = fs.listStatus( new Path(t) )
parts = pp.length
pp.foreach( p => {
try { // timeOut and other filesystem bugs
size = size + fs.getContentSummary(p.getPath).getLength
} catch { case _: Throwable => }
}) // p loop, partitions
} catch { case _: Throwable => }
allVals.append( s"('${dbCut}','${cutPath(t)}',$parts,$size,'$dbHoje')" )
PS: for item 1 on compiler, use SparkSession.builder()... .config("spark.executor.memory", "24G").config("spark.driver.memory", "12G")
Upvotes: 0
Reputation: 4554
I don't have the answer to your specific question. But I'd like to give some recommendations which could be helpful:
map
, filter
, fold
, etc. Don't use global state like lst
and allVars
in your code snippet.scanFolder
ends with sorting without a clean reason.Upvotes: 1