Wang Wei
Wang Wei

Reputation: 343

Spark 2.4.0 still having 2GB limit on shuffle block size?

I am aware the 2GB limit issue but I already set the spark.maxRemoteBlockSizeFetchToMem to less than 2GB, yet I still see the 2GB limit being hit:

19/03/30 06:48:43 INFO CoarseGrainedExecutorBackend: Got assigned task 2008
19/03/30 06:48:43 INFO Executor: Running task 195.0 in stage 4.0 (TID 2008)
19/03/30 06:48:43 INFO ShuffleBlockFetcherIterator: Getting 289 non-empty blocks including 68 local blocks and 221 remote blocks
19/03/30 06:48:43 INFO ShuffleBlockFetcherIterator: Started 3 remote fetches in 1 ms
19/03/30 06:48:43 INFO ShuffleBlockFetcherIterator: Getting 270 non-empty blocks including 67 local blocks and 203 remote blocks
19/03/30 06:48:43 INFO ShuffleBlockFetcherIterator: Started 3 remote fetches in 3 ms
19/03/30 06:48:43 INFO ObjectAggregationIterator: Aggregation hash map reaches threshold capacity (128 entries), spilling and falling back to sort based aggregation. You may change the threshold by adjust option spark.sql.objectHashAggregate.sortBased.fallbackThreshold
19/03/30 06:48:43 INFO ObjectAggregationIterator: Aggregation hash map reaches threshold capacity (128 entries), spilling and falling back to sort based aggregation. You may change the threshold by adjust option spark.sql.objectHashAggregate.sortBased.fallbackThreshold
19/03/30 06:48:43 INFO MemoryStore: Block rdd_23_160 stored as values in memory (estimated size 42.0 MB, free 10.2 GB)
19/03/30 06:48:43 INFO Executor: Finished task 160.0 in stage 4.0 (TID 1973). 2141 bytes result sent to driver
19/03/30 06:48:44 INFO MemoryStore: Block rdd_23_170 stored as values in memory (estimated size 49.9 MB, free 10.2 GB)
19/03/30 06:48:44 INFO Executor: Finished task 170.0 in stage 4.0 (TID 1983). 2141 bytes result sent to driver
19/03/30 06:48:44 INFO MemoryStore: Block rdd_23_148 stored as values in memory (estimated size 79.8 MB, free 10.2 GB)
19/03/30 06:48:44 INFO Executor: Finished task 148.0 in stage 4.0 (TID 1962). 2184 bytes result sent to driver
19/03/30 06:48:44 INFO MemoryStore: Block rdd_23_168 stored as values in memory (estimated size 46.9 MB, free 10.2 GB)
19/03/30 06:48:44 INFO Executor: Finished task 168.0 in stage 4.0 (TID 1981). 2141 bytes result sent to driver
19/03/30 06:48:44 INFO MemoryStore: Block rdd_23_179 stored as values in memory (estimated size 72.7 MB, free 10.2 GB)
19/03/30 06:48:44 INFO Executor: Finished task 179.0 in stage 4.0 (TID 1994). 2141 bytes result sent to driver
19/03/30 06:48:44 INFO MemoryStore: Block rdd_23_151 stored as values in memory (estimated size 59.9 MB, free 10.2 GB)
19/03/30 06:48:45 INFO Executor: Finished task 151.0 in stage 4.0 (TID 1964). 2141 bytes result sent to driver
19/03/30 06:48:45 INFO MemoryStore: Block rdd_23_177 stored as values in memory (estimated size 52.2 MB, free 10.3 GB)
19/03/30 06:48:45 INFO Executor: Finished task 177.0 in stage 4.0 (TID 1990). 2141 bytes result sent to driver
19/03/30 06:48:45 INFO MemoryStore: Block rdd_23_166 stored as values in memory (estimated size 140.4 MB, free 10.2 GB)
19/03/30 06:48:45 INFO Executor: Finished task 166.0 in stage 4.0 (TID 1979). 2141 bytes result sent to driver
19/03/30 06:48:45 INFO MemoryStore: Block rdd_23_183 stored as values in memory (estimated size 44.0 MB, free 10.3 GB)
19/03/30 06:48:45 INFO Executor: Finished task 183.0 in stage 4.0 (TID 1996). 2141 bytes result sent to driver
19/03/30 06:48:45 INFO MemoryStore: Block rdd_23_195 stored as values in memory (estimated size 47.0 MB, free 10.4 GB)
19/03/30 06:48:45 INFO Executor: Finished task 195.0 in stage 4.0 (TID 2008). 2141 bytes result sent to driver
19/03/30 06:48:45 INFO MemoryStore: Block rdd_23_192 stored as values in memory (estimated size 92.1 MB, free 10.3 GB)
19/03/30 06:48:45 INFO Executor: Finished task 192.0 in stage 4.0 (TID 2006). 2141 bytes result sent to driver
19/03/30 06:48:45 INFO MemoryStore: Block rdd_23_182 stored as values in memory (estimated size 93.4 MB, free 10.2 GB)
19/03/30 06:48:45 INFO Executor: Finished task 182.0 in stage 4.0 (TID 1995). 2141 bytes result sent to driver
19/03/30 06:48:45 INFO MemoryStore: Block rdd_23_171 stored as values in memory (estimated size 125.3 MB, free 10.2 GB)
19/03/30 06:48:45 INFO Executor: Finished task 171.0 in stage 4.0 (TID 1988). 2141 bytes result sent to driver
19/03/30 06:48:46 INFO ObjectAggregationIterator: Aggregation hash map reaches threshold capacity (128 entries), spilling and falling back to sort based aggregation. You may change the threshold by adjust option spark.sql.objectHashAggregate.sortBased.fallbackThreshold
19/03/30 06:49:02 WARN BlockManager: Putting block rdd_23_167 failed due to exception java.lang.IllegalArgumentException: Cannot grow BufferHolder by size 1504 because the size after growing exceeds size limitation 2147483632.
19/03/30 06:49:02 WARN BlockManager: Block rdd_23_167 could not be removed as it was not found on disk or in memory
19/03/30 06:49:02 ERROR Executor: Exception in task 167.0 in stage 4.0 (TID 1980)
java.lang.IllegalArgumentException: Cannot grow BufferHolder by size 1504 because the size after growing exceeds size limitation 2147483632
    at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:71)
    at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.grow(UnsafeWriter.java:62)
    at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.writeAlignedBytes(UnsafeWriter.java:175)
    at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:148)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:234)
    at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:223)
    at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:86)
    at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.next(ObjectAggregationIterator.scala:33)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:298)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

I was on Spark 2.3.0 got the same issue, and upgraded to 2.4.0 still the same.

I do have a dataset of about 100GB and my data is somewhat skewed, and I am doing a large aggreation that involves shuffle read etc. Anybody has some hint or ideas?

Upvotes: 4

Views: 11205

Answers (1)

vaquar khan
vaquar khan

Reputation: 11469

You are getting error becuse of following reasons

1) Not enough partitions so need to repartition() once the data is loaded in order to partition the data (via shuffle) to other nodes in the cluster. This will give you the parallelism that you need for faster processing.

2) Skewed data due to a poor partition key choice. Average block size for an unskewed data source is

  • (total data size) / (# mappers) / (# reducers), which is usually a divisor of around

  • 100*100 to 1000*1000

So we typically see on the order of KB or MB for single block sizes.

LArray library that can handle data larger than 2GB, which is the limit of Java byte arrays and map files:

Here you can find good link :

Upvotes: 5

Related Questions