Reputation: 9328
I'm training a Word2Vec model that has a fairly important number of individual terms (~ 100k), on a 200 dimensions basis.
Spark's typical W2V modelisation currently adds up to a memory usage mainly composed of the vectors for each word, that is : numberOfDimensions*sizeof(float)*numberOfWords
. Do the math, the order of magnitude of the above is at 100MB, give or take.
Considering I'm still working on my tokenizers and still benching for optimal vector size, I'm actually doing computations on a dictionary of 75k-150k words and from 100 to 300 dimensions, so let's just say the model can reach ~500MB.
Now everything is fine, up untill the saving of this model. Which is currently implemented in Spark this way :
override protected def saveImpl(path: String): Unit = {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.wordVectors.wordIndex, instance.wordVectors.wordVectors.toSeq)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
}
That is : a dataframe of 1 row is created, the row containing a big f(l)at array of all the vectors. The dataframe is saved as parquet. That's fine... unless... you have to ship it to an executor. Which you do in cluster mode.
This ends up blowing up the job, with a stacktrace like so :
16/11/28 11:29:00 INFO scheduler.DAGScheduler: Job 3 failed: parquet at Word2Vec.scala:311, took 5,208453 s
16/11/28 11:29:00 ERROR datasources.InsertIntoHadoopFsRelationCommand: Aborting job.
org.apache.spark.SparkException: Job aborted due to stage failure:
Serialized task 32:5 was 204136673 bytes,
which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes).
Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
Simple code to reproduce (you can not spark-shell it locally, though, you need to ship it to a cluster) :
object TestW2V {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("TestW2V").getOrCreate()
import spark.implicits._
// Alphabet
val randomChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTYVWXTZ".toCharArray
val random = new java.util.Random()
// Dictionnary
def makeWord(wordLength: Int): String = new String((0 until wordLength).map(_ => randomChars(random.nextInt(randomChars.length))).toArray)
val randomWords = for (wordIndex <- 0 to 100000) // Make approx 100 thousand distinct words
yield makeWord(random.nextInt(10)+5)
// Corpus (make it fairly non trivial)
def makeSentence(numberOfWords: Int): Seq[String] = (0 until numberOfWords).map(_ => randomWords(random.nextInt(randomWords.length)))
val allWordsDummySentence = randomWords // all words at least once
val randomSentences = for (sentenceIndex <- 0 to 100000)
yield makeSentence(random.nextInt(10) +5)
val corpus: Seq[Seq[String]] = allWordsDummySentence +: randomSentences
// Train a W2V model on the corpus
val df = spark.createDataFrame(corpus.map(Tuple1.apply))
import org.apache.spark.ml.feature.Word2Vec
val w2v = new Word2Vec().setVectorSize(250).setMinCount(1).setInputCol("_1").setNumPartitions(4)
val w2vModel = w2v.fit(df)
w2vModel.save("/home/Documents/w2v")
spark.stop
}
}
Now... I understand the internals well enough, I guess, to understand why this happens. The questions is :
spark.mllib.feature.Word2VecModel
("deprecated" RDD based 1.x version) has a public constructor I could manually work with by rolling my own, properly partionned save/load implementations. But the new spark.ml.feature.Word2VecModel
does not provide a public constructor that I can see. Considering the spark team fixed this JIRA : https://issues.apache.org/jira/browse/SPARK-11994, (which is for the 1.x API), I guess they did double check on the 2.0 API, and I'm doing something wrong :-).
For know I guess I can run it in local mode, and avoid the final task serialization but this is a temporary solution at best, which will not be possible at production level (data accessibility and all...). Or crack up the RPC size to 512MB, sure...
PS : The above happens with Spark 2.0.1, and on a spark standalone cluster (not reproductible in local mode).
I'd usually post this kind of messages to a users-mailing-list, but seeing Spark encourages the use of SO, here goes...
Upvotes: 4
Views: 3787
Reputation: 31
I have exactly the same experience as you. It works fine locally, but in cluster mode it dies, without cranking up the RPC size to 512mb as you suggested.
i.e. passing through spark.rpc.message.maxSize=512
gets me by.
And I also agree the saving implementation looks suspect, especially with the repartition(1)
bit.
Upvotes: 3