Reputation: 81
I have a spark bench which includes a terasort and it run properly when data is only a few hundred of GB,but when i generate more data such as 1 TB, it went wrong in some step.The following is my code:
import org.apache.spark.rdd._
import org.apache.spark._
import org.apache.spark.SparkContext._
object ScalaTeraSort{
def main(args: Array[String]){
if (args.length < 2){
System.err.println(
s"Usage: $ScalaTeraSort <INPUT_HDFS> <OUTPUT_HDFS>"
)
System.exit(1)
}
val sparkConf = new SparkConf().setAppName("ScalaTeraSort")
val sc = new SparkContext(sparkConf)
val file = sc.textFile(args(0))
val data = file.map(line => (line.substring(0, 10), line.substring(10)))
.sortByKey().map{case(k, v) => k + v}
data.saveAsTextFile(args(1))
sc.stop()
}
}
this code mainly includes 3 steps: sortByKey, map and saveAsTextFile. it seems there is no wrong in the first two step but when it comes to the third step,it went wrong all the times and then retried the second step. the third step went wrong because of "FetchFailed(BlockManagerId(40, sr232, 44815, 0), shuffleId=0, mapId=11825, reduceId=0)"
Upvotes: 4
Views: 957
Reputation: 81
I found out the reason, the essential problem is : java.io.IOException: sendMessageReliably failed because ack was not received within 60 sec
that is to say,you have to set the property "spark.core.connection.ack.wait.timeout" to a bigger value, by default it's 60 secs. Otherwises, the stage will fail because of long time not response.
Upvotes: 4