Reputation: 103
Our main goal is that we want to perform operations on a large amount of input data (around 80 GB). The problem is that even for smaller datasets, we often get java heap space or other memory related errors.
Our temporary solution was to simply specify a higher maximum heap size (using -Xmx
locally or by setting spark.executor.memory
and spark.driver.memory
for our spark instance) but this does not seem to generalize well, we still run into errors for bigger datasets or higher zoom levels.
For better understanding, here is the basic concept of what we do with our data:
Load the data using HadoopGeoTiffRDD.spatial(new Path(path))
Map the data to the tiles of some zoom level
val extent = geotrellis.proj4.CRS.fromName("EPSG:4326").worldExtent
val layout = layoutForZoom(zoom, extent)
val metadata: TileLayerMetadata[SpatialKey] = dataSet.collectMetadata[SpatialKey](layout)
val rdd = ContextRDD(dataSet.tileToLayout[SpatialKey](metadata), metadata)
Where layoutForZoom
is basically the same as geotrellis.spark.tiling.ZoomedLayoutScheme.layoutForZoom
Then we perform some operations on the entries of the rdd using rdd.map
and rdd.foreach
for the mapped rdds.
We aggregate the results of four tiles which correspond to a single tile in a higher zoom level using groupByKey
Go to 3 until we reached a certain zoom level
The goal would be: Given a memory limit of X GB, partition and work on the data in a way that we consume at most X GB.
It seems like the tiling of the dataset via tileToLayout
already takes too much memory on higher zoom levels (even for very small data sets). Are there any alternatives for tiling and loading the data according to some LayoutDefinition? As far as we understand, HadoopGeoTiffRDD.spatial
already splits the dataset into small regions which are then divided into the tiles by tileToLayout
. Is it somehow possible to directly load the dataset corresponding to the LayoutDefinition?
In our concrete scenario we have 3 workers with 2GB RAM and 2 cores each. On one of them is running the spark master too, which gets its work via spark-submit from a driver instance. We tried configurations like this:
val conf = new SparkConf().setAppName("Converter").setMaster("spark://IP-ADDRESS:PORT")
.set("spark.executor.memory", "900m") // to be below the available 1024 MB of default slave RAM
.set("spark.memory.fraction", "0.2") // to get more usable heap space
.set("spark.executor.cores", "2")
.set("spark.executor.instances", "3")
An example for a heap space error at the tiling step (step 2):
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 1 times, most recent failure: Lost task 1.0 in stage 2.0 (TID 5, 192.168.0.2, executor 1): java.lang.OutOfMemoryError: Java heap space
at scala.collection.mutable.ArrayBuilder$ofByte.mkArray(ArrayBuilder.scala:128)
at scala.collection.mutable.ArrayBuilder$ofByte.resize(ArrayBuilder.scala:134)
at scala.collection.mutable.ArrayBuilder$ofByte.sizeHint(ArrayBuilder.scala:139)
at scala.collection.IndexedSeqOptimized$class.slice(IndexedSeqOptimized.scala:115)
at scala.collection.mutable.ArrayOps$ofByte.slice(ArrayOps.scala:198)
at geotrellis.util.StreamingByteReader.getBytes(StreamingByteReader.scala:98)
at geotrellis.raster.io.geotiff.LazySegmentBytes.getBytes(LazySegmentBytes.scala:104)
at geotrellis.raster.io.geotiff.LazySegmentBytes.readChunk(LazySegmentBytes.scala:81)
at geotrellis.raster.io.geotiff.LazySegmentBytes$$anonfun$getSegments$1.apply(LazySegmentBytes.scala:99)
at geotrellis.raster.io.geotiff.LazySegmentBytes$$anonfun$getSegments$1.apply(LazySegmentBytes.scala:99)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:185)
at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1012)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1010)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2118)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2118)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2119)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
at geotrellis.spark.TileLayerMetadata$.collectMetadataWithCRS(TileLayerMetadata.scala:147)
at geotrellis.spark.TileLayerMetadata$.fromRdd(TileLayerMetadata.scala:281)
at geotrellis.spark.package$withCollectMetadataMethods.collectMetadata(package.scala:212)
...
Update:
I extracted an example from my code and uploaded it to the repository at https://gitlab.com/hwuerz/geotrellis-spark-example. You can run the example locally using sbt run
and selecting the class demo.HelloGeotrellis
. This will create the tiles for a tiny input dataset example.tif
according to our layout definition starting at zoom level 20 (using two cores per default, can be adjusted in the file HelloGeotrellis.scala
~ if level 20 somehow still works, it will most likely fail using higher values for bottomLayer
).
To run the code on the Spark Cluster, I use the following command:
`sbt package && bash submit.sh --dataLocation /mnt/glusterfs/example.tif --bottomLayer 20 --topLayer 10 --cesiumTerrainDir /mnt/glusterfs/terrain/ --sparkMaster spark://192.168.0.8:7077`
Wheresubmit.sh
basically runs spark-submit
(see the file in the repo).
The example.tif
is included in the repo within the directory DebugFiles
. In my setup the file is distributed via glusterfs which is why the path points to this location. The cesiumTerrainDir
is just an directory where we store our generated output.
We think the main problem might be that using the given api calls, geotrellis loads the complete structure of the layout into the memory, which is too big for higher zoom levels. Is there any way to avoid this?
Upvotes: 1
Views: 371