GPI
GPI

Reputation: 9328

Spark Word2VecModel exceeds max RPC size for saving

I'm training a Word2Vec model that has a fairly important number of individual terms (~ 100k), on a 200 dimensions basis.

Spark's typical W2V modelisation currently adds up to a memory usage mainly composed of the vectors for each word, that is : numberOfDimensions*sizeof(float)*numberOfWords. Do the math, the order of magnitude of the above is at 100MB, give or take.
Considering I'm still working on my tokenizers and still benching for optimal vector size, I'm actually doing computations on a dictionary of 75k-150k words and from 100 to 300 dimensions, so let's just say the model can reach ~500MB.

Now everything is fine, up untill the saving of this model. Which is currently implemented in Spark this way :

override protected def saveImpl(path: String): Unit = {
  DefaultParamsWriter.saveMetadata(instance, path, sc)
  val data = Data(instance.wordVectors.wordIndex, instance.wordVectors.wordVectors.toSeq)
  val dataPath = new Path(path, "data").toString
  sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
}

That is : a dataframe of 1 row is created, the row containing a big f(l)at array of all the vectors. The dataframe is saved as parquet. That's fine... unless... you have to ship it to an executor. Which you do in cluster mode.

This ends up blowing up the job, with a stacktrace like so :

16/11/28 11:29:00 INFO scheduler.DAGScheduler: Job 3 failed: parquet at Word2Vec.scala:311, took 5,208453 s  
16/11/28 11:29:00 ERROR datasources.InsertIntoHadoopFsRelationCommand: Aborting job.
org.apache.spark.SparkException: Job aborted due to stage failure: 
    Serialized task 32:5 was 204136673 bytes, 
    which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes).
    Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)

Simple code to reproduce (you can not spark-shell it locally, though, you need to ship it to a cluster) :

object TestW2V {

def main(args: Array[String]): Unit = {
  val spark = SparkSession.builder().appName("TestW2V").getOrCreate()
  import spark.implicits._

  // Alphabet
  val randomChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTYVWXTZ".toCharArray
  val random = new java.util.Random()

  // Dictionnary
  def makeWord(wordLength: Int): String = new String((0 until wordLength).map(_ => randomChars(random.nextInt(randomChars.length))).toArray)
  val randomWords = for (wordIndex <- 0 to 100000) // Make approx 100 thousand distinct words
                    yield makeWord(random.nextInt(10)+5)

  // Corpus (make it fairly non trivial)
  def makeSentence(numberOfWords: Int): Seq[String] = (0 until numberOfWords).map(_ => randomWords(random.nextInt(randomWords.length)))
  val allWordsDummySentence = randomWords // all words at least once
  val randomSentences = for (sentenceIndex <- 0 to 100000) 
                        yield makeSentence(random.nextInt(10) +5)
  val corpus: Seq[Seq[String]] = allWordsDummySentence +: randomSentences

  // Train a W2V model on the corpus
  val df = spark.createDataFrame(corpus.map(Tuple1.apply))
  import org.apache.spark.ml.feature.Word2Vec
  val w2v = new Word2Vec().setVectorSize(250).setMinCount(1).setInputCol("_1").setNumPartitions(4)
  val w2vModel = w2v.fit(df)
  w2vModel.save("/home/Documents/w2v")

  spark.stop
}
}

Now... I understand the internals well enough, I guess, to understand why this happens. The questions is :

Considering the spark team fixed this JIRA : https://issues.apache.org/jira/browse/SPARK-11994, (which is for the 1.x API), I guess they did double check on the 2.0 API, and I'm doing something wrong :-).

For know I guess I can run it in local mode, and avoid the final task serialization but this is a temporary solution at best, which will not be possible at production level (data accessibility and all...). Or crack up the RPC size to 512MB, sure...

PS : The above happens with Spark 2.0.1, and on a spark standalone cluster (not reproductible in local mode).
I'd usually post this kind of messages to a users-mailing-list, but seeing Spark encourages the use of SO, here goes...

Upvotes: 4

Views: 3787

Answers (1)

Stefan Krawczyk
Stefan Krawczyk

Reputation: 31

I have exactly the same experience as you. It works fine locally, but in cluster mode it dies, without cranking up the RPC size to 512mb as you suggested.

i.e. passing through spark.rpc.message.maxSize=512 gets me by.

And I also agree the saving implementation looks suspect, especially with the repartition(1) bit.

Upvotes: 3

Related Questions