Reputation: 25447
If I increase the model size of my word2vec model I start to get this kind of exception in my log:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 6
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:542)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:538)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:538)
at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:155)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:47)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96)
at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I tried to write my own "save model" version which looks like this:
def save(model: Word2VecModel, sc: SparkContext, path: String): Unit = {
println("Saving model as CSV ..")
val vectorSize = model.getVectors.values.head.size
println("vectorSize="+vectorSize)
val SEPARATOR_TOKEN = " "
val dataArray = model.getVectors.toSeq.map { case (w, v) => Data(w, v) }
println("Got dataArray ..")
println("parallelize(dataArray, 10)")
val par = sc.parallelize(dataArray, 10)
.map(d => {
val sb = new mutable.StringBuilder()
sb.append(d.word)
sb.append(SEPARATOR_TOKEN)
for(v <- d.vector) {
sb.append(v)
sb.append(SEPARATOR_TOKEN)
}
sb.setLength(sb.length - 1)
sb.append("\n")
sb.toString()
})
println("repartition(1)")
val rep = par.repartition(1)
println("collect()")
val vectorsAsString = rep.collect()
println("Collected serialized vectors ..")
val cfile = new mutable.StringBuilder()
cfile.append(vectorsAsString.length)
cfile.append(" ")
cfile.append(vectorSize)
cfile.append("\n")
val sb = new StringBuilder
sb.append("word,")
for(i <- 0 until vectorSize) {
sb.append("v")
sb.append(i.toString)
sb.append(",")
}
sb.setLength(sb.length - 1)
sb.append("\n")
for(vectorString <- vectorsAsString) {
sb.append(vectorString)
cfile.append(vectorString)
}
println("Saving file to " + new Path(path, "data").toUri.toString)
sc.parallelize(sb.toString().split("\n"), 1).saveAsTextFile(new Path(path+".csv", "data").toUri.toString)
sc.parallelize(cfile.toString().split("\n"), 1).saveAsTextFile(new Path(path+".cs", "data").toUri.toString)
}
Apparently it's working similar to their current implementation - it doesn't.
I'd like to get a word2vec model. It works with small files but not if the model gets larger.
Upvotes: 10
Views: 29933
Reputation: 1495
In short, configs might help:
--conf spark.blacklist.enabled=true # blacklist bad machine
--conf spark.reducer.maxReqsInFlight=10 # limit concurrent requests from reducer
--conf spark.shuffle.io.retryWait=10s # increase retry wait
--conf spark.shuffle.io.maxRetries=10 # increase retry times
--conf spark.shuffle.io.backLog=4096 # increase tcp connection wait queue length
Long explainations as below.
For MetadataFetchFailedException, it usually happens when one executor suddenly being killed or terminated, but this executor has some shuffle output, then when another executor try to fetch metadata of this shuffle output, exception happens.
In most cases, this is caused by container killed by Yarn for exceeding memory limits. So you need to double confirm this in the logs.
The most common fix is to increase memoryOverhead, the default value is 0.1 * executor memory. This is too small for most cases. I would suggest to make it 0.2 * executor memory. If you have large amount of executors or run another sub-process, you need bigger value for this.
Upvotes: 1
Reputation: 74709
MetadataFetchFailedException
is thrown when a MapOutputTracker
on an executor could not find requested shuffle map outputs for partitions in local cache and tried to fetch them remotely from the driver's MapOutputTracker
.
That could lead to few conclusions:
Please review the logs looking for issues reported as "Executor lost" INFO messages and/or review web UI's Executors page and see how the executors work.
The root cause of executors being lost may also be that the cluster manager has decided to kill ill-behaved executors (that may have used up more memory than requested).
See the other question FetchFailedException or MetadataFetchFailedException when processing big data set for more insights.
Upvotes: 19