user2179977
user2179977

Reputation: 656

Why does Spark task take a long time to find block locally?

The RDD has 512 equally sized partitions and is 100% cached in memory across 512 executors.

I have a filter-map-collect job with 512 tasks. Sometimes this job completes sub-second. On other occasions 50% of the tasks complete sub-second, 45% of the tasks take 10 seconds and 5% of the tasks take 20 seconds.

Here is the log from an executor where the task took 20 seconds:

15/12/16 09:44:37 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 5312 
15/12/16 09:44:37 INFO executor.Executor: Running task 215.0 in stage 17.0 (TID 5312) 
15/12/16 09:44:37 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 10 
15/12/16 09:44:37 INFO storage.MemoryStore: ensureFreeSpace(1777) called with curMem=908793307, maxMem=5927684014 
15/12/16 09:44:37 INFO storage.MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 1777.0 B, free 4.7 GB) 
15/12/16 09:44:37 INFO broadcast.TorrentBroadcast: Reading broadcast variable 10 took 186 ms 
15/12/16 09:44:37 INFO storage.MemoryStore: ensureFreeSpace(3272) called with curMem=908795084, maxMem=5927684014 
15/12/16 09:44:37 INFO storage.MemoryStore: Block broadcast_10 stored as values in memory (estimated size 3.2 KB, free 4.7 GB) 
15/12/16 09:44:57 INFO storage.BlockManager: Found block rdd_5_215 locally 
15/12/16 09:44:57 INFO executor.Executor: Finished task 215.0 in stage 17.0 (TID 5312). 2074 bytes result sent to driver 

So it appears the 20 seconds is spent finding the local block. Looking at the logs for other slow tasks indicates that they are all delayed for the same reason. My understanding is that a local block means within the same JVM instance and so I don't understand why it takes so long to locate it.

Since the lag is always either exactly 10 seconds or exactly 20 seconds I suspect it's due to a 10 second timeout on some listener, or something like that. If that is true then I guess my options are either find out why it's timing out and fix it or make the timeout shorter so it tries more frequently.

Why does the task take so long to find a local block and how can I resolve this?

Update: Adding DEBUG log for org.apache.spark.storage.

16/02/01 12:14:07 INFO CoarseGrainedExecutorBackend: Got assigned task 3029
16/02/01 12:14:07 INFO Executor: Running task 115.0 in stage 9.0 (TID 3029)
16/02/01 12:14:07 DEBUG Executor: Task 3029's epoch is 1
16/02/01 12:14:07 DEBUG BlockManager: Getting local block broadcast_6
16/02/01 12:14:07 DEBUG BlockManager: Block broadcast_6 not registered locally
16/02/01 12:14:07 INFO TorrentBroadcast: Started reading broadcast variable 6
16/02/01 12:14:07 DEBUG TorrentBroadcast: Reading piece broadcast_6_piece0 of broadcast_6
16/02/01 12:14:07 DEBUG BlockManager: Getting local block broadcast_6_piece0 as bytes
16/02/01 12:14:07 DEBUG BlockManager: Block broadcast_6_piece0 not registered locally
16/02/01 12:14:07 DEBUG BlockManager: Getting remote block broadcast_6_piece0 as bytes
16/02/01 12:14:07 DEBUG BlockManager: Getting remote block broadcast_6_piece0 from BlockManagerId(385, node1._.com, 54162)
16/02/01 12:14:07 DEBUG TransportClient: Sending fetch chunk request 0 to node1._.com:54162
16/02/01 12:14:07 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 2017.0 B, free 807.3 MB)
16/02/01 12:14:07 DEBUG BlockManagerMaster: Updated info of block broadcast_6_piece0
16/02/01 12:14:07 DEBUG BlockManager: Told master about block broadcast_6_piece0
16/02/01 12:14:07 DEBUG BlockManager: Put block broadcast_6_piece0 locally took  2 ms
16/02/01 12:14:07 DEBUG BlockManager: Putting block broadcast_6_piece0 without replication took  2 ms
16/02/01 12:14:07 INFO TorrentBroadcast: Reading broadcast variable 6 took 87 ms
16/02/01 12:14:07 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 3.6 KB, free 807.3 MB)
16/02/01 12:14:07 DEBUG BlockManager: Put block broadcast_6 locally took  1 ms
16/02/01 12:14:07 DEBUG BlockManager: Putting block broadcast_6 without replication took  1 ms
16/02/01 12:14:17 DEBUG CacheManager: Looking for partition rdd_5_115
16/02/01 12:14:17 DEBUG BlockManager: Getting local block rdd_5_115
16/02/01 12:14:17 DEBUG BlockManager: Level for block rdd_5_115 is StorageLevel(false, true, false, true, 1)
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: removing broadcast 4
16/02/01 12:14:17 DEBUG BlockManager: Getting block rdd_5_115 from memory
16/02/01 12:14:17 DEBUG BlockManager: Removing broadcast 4
16/02/01 12:14:17 INFO BlockManager: Found block rdd_5_115 locally
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_4
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_4 of size 3680 dropped from memory (free 5092230668)
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_4_piece0
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_4_piece0 of size 2017 dropped from memory (free 5092232685)
16/02/01 12:14:17 DEBUG BlockManagerMaster: Updated info of block broadcast_4_piece0
16/02/01 12:14:17 DEBUG BlockManager: Told master about block broadcast_4_piece0
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Done removing broadcast 4, response is 2
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Sent response: 2 to node2._.com:45115
16/02/01 12:14:17 INFO Executor: Finished task 115.0 in stage 9.0 (TID 3029). 2164 bytes result sent to driver
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: removing broadcast 5
16/02/01 12:14:17 DEBUG BlockManager: Removing broadcast 5
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_5_piece0
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_5_piece0 of size 2017 dropped from memory (free 5092234702)
16/02/01 12:14:17 DEBUG BlockManagerMaster: Updated info of block broadcast_5_piece0
16/02/01 12:14:17 DEBUG BlockManager: Told master about block broadcast_5_piece0
16/02/01 12:14:17 DEBUG BlockManager: Removing block broadcast_5
16/02/01 12:14:17 DEBUG MemoryStore: Block broadcast_5 of size 3680 dropped from memory (free 5092238382)
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Done removing broadcast 5, response is 2
16/02/01 12:14:17 DEBUG BlockManagerSlaveEndpoint: Sent response: 2 to node2._.com:45115

Upvotes: 18

Views: 6002

Answers (2)

ACH
ACH

Reputation: 33

How many total cores are you allocating to your Spark application? This might happen if you are allocating 256 cores and if the value for spark.locality.wait is 10.

I don't know your environment but it seems you have too many executors. Have only a few executors (depending on how powerful your compute nodes are) and have multiple cores available to each executor. In short instead of having lots of processes with 1 thread each, have a few processes with many threads each.

Upvotes: 1

Stephen Carman
Stephen Carman

Reputation: 997

The only thing that seems to stand out to me is that you have replication turned on via your storage level StorageLevel(false, true, false, true, 1)

Since you have 512 partitions across 512 executors it may be replicating the blocks across every executor, which may cause that slowdown at the end. I'd try and turn off the replication and see what that does for the performance.

Upvotes: 1

Related Questions