Reputation: 121
My cluster: 9 slaves with 100GB memory and 320GB hard drive each. Each host has 16 cores. I started 15 spark executors on each host so the available memory to each executor is 6GB. My appliation:
val rdd1 = sc.textFile("a big file in S3. about 200GB" with 14M rows)
val rdd2 = sc.textFile("another big file in S3. about 200GB" with 14M rows)
val par = new HashPartitioner(150)
val rdd1Paired = rdd1.map(regular expression to get one string from each row).filter(non matched rows).partitionBy(par)
val rdd2Paired = rdd2.mpa(regular expression to get one string from each row).filter(non matched rows).partitionBy(par)
val rdd3 = rdd1.join(rdd2, par)
rdd3.count()
I figured from spark UI that the job was scheduled in three stages. Filtering for rdd1, filtering for rdd2, and the count. While filtering1 and filtering2 succeeds but count always failed due to OOM. The weird thing is that the job always hangs at count stage (149/150). I checked the executor which was assigned TID150 and see a dramatically increase in Shuffle read. And it crashes after some time due to OOM. I also saw the GC is happening frequently on that executor.
Question here: Why only one executor is fetching all the data (I checked the last operation that executor was doing is starting fetch work)? To my understanding, once I use same partitioner for two RDDs, they will be co-partitioned. And launch them in the same job guarantees co-location of the data. So the joining should happen on each executor itself. The first 149 tasks exits quickly and didn't seem to do anything. Looks like the last task is trying to all the work.
Upvotes: 3
Views: 3052
Reputation: 309
The distribution of keys is not uniform. Check to be sure none of your filter functions have the incidental effect of causing a greater concentration of a certain keys.
Upvotes: 2
Reputation: 25929
I guess the distribution of keys that you generate from your data is skewed so that a lot of them end up in the same partition.
The fact that it is the last task that fails is that it is the largest so it executes longest and get to be the last.
To solve this increase the number of partitions and/or beef up your servers
Upvotes: 0