FriendlyFruit
FriendlyFruit

Reputation: 103

How to analyze and limit the memory usage of geotrellis and spark for tiling

Our main goal is that we want to perform operations on a large amount of input data (around 80 GB). The problem is that even for smaller datasets, we often get java heap space or other memory related errors.

Our temporary solution was to simply specify a higher maximum heap size (using -Xmx locally or by setting spark.executor.memory and spark.driver.memory for our spark instance) but this does not seem to generalize well, we still run into errors for bigger datasets or higher zoom levels.

For better understanding, here is the basic concept of what we do with our data:

  1. Load the data using HadoopGeoTiffRDD.spatial(new Path(path))

  2. Map the data to the tiles of some zoom level

    val extent = geotrellis.proj4.CRS.fromName("EPSG:4326").worldExtent
    val layout = layoutForZoom(zoom, extent)
    val metadata: TileLayerMetadata[SpatialKey] = dataSet.collectMetadata[SpatialKey](layout)
    val rdd = ContextRDD(dataSet.tileToLayout[SpatialKey](metadata), metadata)
    

    Where layoutForZoom is basically the same as geotrellis.spark.tiling.ZoomedLayoutScheme.layoutForZoom

  3. Then we perform some operations on the entries of the rdd using rdd.map and rdd.foreach for the mapped rdds.

  4. We aggregate the results of four tiles which correspond to a single tile in a higher zoom level using groupByKey

  5. Go to 3 until we reached a certain zoom level

The goal would be: Given a memory limit of X GB, partition and work on the data in a way that we consume at most X GB.

It seems like the tiling of the dataset via tileToLayout already takes too much memory on higher zoom levels (even for very small data sets). Are there any alternatives for tiling and loading the data according to some LayoutDefinition? As far as we understand, HadoopGeoTiffRDD.spatial already splits the dataset into small regions which are then divided into the tiles by tileToLayout. Is it somehow possible to directly load the dataset corresponding to the LayoutDefinition?

In our concrete scenario we have 3 workers with 2GB RAM and 2 cores each. On one of them is running the spark master too, which gets its work via spark-submit from a driver instance. We tried configurations like this:

val conf = new SparkConf().setAppName("Converter").setMaster("spark://IP-ADDRESS:PORT")
  .set("spark.executor.memory", "900m") // to be below the available 1024 MB of default slave RAM
  .set("spark.memory.fraction", "0.2") // to get more usable heap space
  .set("spark.executor.cores", "2")
  .set("spark.executor.instances", "3")

An example for a heap space error at the tiling step (step 2):

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 1 times, most recent failure: Lost task 1.0 in stage 2.0 (TID 5, 192.168.0.2, executor 1): java.lang.OutOfMemoryError: Java heap space
        at scala.collection.mutable.ArrayBuilder$ofByte.mkArray(ArrayBuilder.scala:128)
        at scala.collection.mutable.ArrayBuilder$ofByte.resize(ArrayBuilder.scala:134)
        at scala.collection.mutable.ArrayBuilder$ofByte.sizeHint(ArrayBuilder.scala:139)
        at scala.collection.IndexedSeqOptimized$class.slice(IndexedSeqOptimized.scala:115)
        at scala.collection.mutable.ArrayOps$ofByte.slice(ArrayOps.scala:198)
        at geotrellis.util.StreamingByteReader.getBytes(StreamingByteReader.scala:98)
        at geotrellis.raster.io.geotiff.LazySegmentBytes.getBytes(LazySegmentBytes.scala:104)
        at geotrellis.raster.io.geotiff.LazySegmentBytes.readChunk(LazySegmentBytes.scala:81)
        at geotrellis.raster.io.geotiff.LazySegmentBytes$$anonfun$getSegments$1.apply(LazySegmentBytes.scala:99)
        at geotrellis.raster.io.geotiff.LazySegmentBytes$$anonfun$getSegments$1.apply(LazySegmentBytes.scala:99)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:185)
        at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1336)
        at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1012)
        at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$15.apply(RDD.scala:1010)
        at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2118)
        at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2118)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2119)
        at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
        at geotrellis.spark.TileLayerMetadata$.collectMetadataWithCRS(TileLayerMetadata.scala:147)
        at geotrellis.spark.TileLayerMetadata$.fromRdd(TileLayerMetadata.scala:281)
        at geotrellis.spark.package$withCollectMetadataMethods.collectMetadata(package.scala:212)
        ...

Update:

I extracted an example from my code and uploaded it to the repository at https://gitlab.com/hwuerz/geotrellis-spark-example. You can run the example locally using sbt run and selecting the class demo.HelloGeotrellis. This will create the tiles for a tiny input dataset example.tif according to our layout definition starting at zoom level 20 (using two cores per default, can be adjusted in the file HelloGeotrellis.scala ~ if level 20 somehow still works, it will most likely fail using higher values for bottomLayer).

To run the code on the Spark Cluster, I use the following command:

 `sbt package && bash submit.sh --dataLocation /mnt/glusterfs/example.tif --bottomLayer 20 --topLayer 10 --cesiumTerrainDir /mnt/glusterfs/terrain/ --sparkMaster spark://192.168.0.8:7077`

Wheresubmit.sh basically runs spark-submit (see the file in the repo).

The example.tif is included in the repo within the directory DebugFiles. In my setup the file is distributed via glusterfs which is why the path points to this location. The cesiumTerrainDir is just an directory where we store our generated output.

We think the main problem might be that using the given api calls, geotrellis loads the complete structure of the layout into the memory, which is too big for higher zoom levels. Is there any way to avoid this?

Upvotes: 1

Views: 371

Answers (0)

Related Questions