anonygrits
anonygrits

Reputation: 1499

Spark Word2Vec example using text8 file

I'm trying to run this example from apache.spark.org (code is below & entire tutorial is here: https://spark.apache.org/docs/latest/mllib-feature-extraction.html) using the text8 file that they reference on their site (http://mattmahoney.net/dc/text8.zip):

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

val input = sc.textFile("/Users/rkita/Documents/Learning/random/spark/MLlib/examples/text8",4).map(line => line.split(" ").toSeq)

val word2vec = new Word2Vec()

val model = word2vec.fit(input)

val synonyms = model.findSynonyms("china", 40)

for((synonym, cosineSimilarity) <- synonyms) {
  println(s"$synonym $cosineSimilarity")
}

// Save and load model
model.save(sc, "myModelPath")
val sameModel = Word2VecModel.load(sc, "myModelPath")

I am working on Spark on my mac (2 cores, 8GB RAM), and I think I've set the memory allocations correctly in my spark-env.sh file with the following:

export SPARK_EXECUTOR_MEMORY=4g
export SPARK_WORKER_MEMORY=4g

When I try to fit the model, I keep getting java heap errors. I got the same result in python as well. I increased the java memory sizes using JAVA_OPTS as well.

The file is only 100MB, so I think somehow my memory settings are not correct, but I'm not sure if that's the root cause.

Has anyone else tried this example on a laptop?

I can't put the file on our company servers because we're not supposed to import external data, so I'm reduced to working on my personal laptop. If you have any suggestions, I'd appreciate hearing them. Thx!

Upvotes: 4

Views: 4624

Answers (2)

njustice
njustice

Reputation: 46

sc.textFile splits on newlines only, and text8 contains no newlines.

You are creating a 1-row RDD. .map(line => line.split(" ").toSeq) creates another 1-row RDD of type RDD[Seq[String]].

Word2Vec works best with 1 sentence per row of RDD (and this should also avoid Java heap errors). Unfortunately text8 has had periods stripped out so you can't just split on them, but you can find the raw version here as well as the perl script used to process it, and it isn't hard to edit the script to not remove periods.

Upvotes: 0

blackbox
blackbox

Reputation: 671

First of all, I am a newcomer to Spark, so others may have quicker or better solutions. I ran into the same difficulties to run this sample code. I manage to make it work, mainly by:

  1. Running my own Spark cluster on my machine: use the start scripts in the /sbin/ directory of your Spark installation. To do so, you have to configure the conf/spark-env.sh file according to your needs. DO NOT use 127.0.0.1 IP for Spark.
  2. Compile and package Scala code as a jar (sbt package), then provide it to the cluster (see addJar(...) In Scala code). It seems it is possible to provide compiled code to Spark using classpath / extra classpath, but I did not try it yet.
  3. Set executor memory and driver memory (see Scala code)

spark-env.sh:

export SPARK_MASTER_IP=192.168.1.53
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8080

export SPARK_DAEMON_MEMORY=1G
# Worker : 1 by server
# Number of worker instances to run on each machine (default: 1). 
# You can make this more than 1 if you have have very large machines and would like multiple Spark worker processes. 
# If you do set this, make sure to also set SPARK_WORKER_CORES explicitly to limit the cores per worker, 
# or else each worker will try to use all the cores.
export SPARK_WORKER_INSTANCES=2
# Total number of cores to allow Spark applications to use on the machine (default: all available cores).
export SPARK_WORKER_CORES=7

#Total amount of memory to allow Spark applications to use on the machine, e.g. 1000m, 2g 
# (default: total memory minus 1 GB); 
# note that each application's individual memory is configured using its spark.executor.memory property.
export SPARK_WORKER_MEMORY=8G
export SPARK_WORKER_DIR=/tmp

# Executor : 1 by application run on the server
# export SPARK_EXECUTOR_INSTANCES=4
# export SPARK_EXECUTOR_MEMORY=4G

export SPARK_SCALA_VERSION="2.10"

Scala file to run the example:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}

object SparkDemo {

  def log[A](key:String)(job : =>A) = {
    val start = System.currentTimeMillis
    val output = job
    println("===> %s in %s seconds"
      .format(key, (System.currentTimeMillis - start) / 1000.0))
    output
  }

  def main(args: Array[String]):Unit ={

    val modelName ="w2vModel"

    val sc = new SparkContext(
      new SparkConf()
      .setAppName("SparkDemo")
      .set("spark.executor.memory", "8G")
      .set("spark.driver.maxResultSize", "16G")
      .setMaster("spark://192.168.1.53:7077") // ip of the spark master.
      // .setMaster("local[2]") // does not work... workers loose contact with the master after 120s
    )

    // take a look into target folder if you are unsure how the jar is named
    // onliner to compile / run : sbt package && sbt run
    sc.addJar("./target/scala-2.10/sparkling_2.10-0.1.jar")

    val input = sc.textFile("./text8").map(line => line.split(" ").toSeq)

    val word2vec = new Word2Vec()

    val model = log("compute model") { word2vec.fit(input) }
    log ("save model") { model.save(sc, modelName) }

    val synonyms = model.findSynonyms("china", 40)
    for((synonym, cosineSimilarity) <- synonyms) {
      println(s"$synonym $cosineSimilarity")
    }

    val model2 = log("reload model") { Word2VecModel.load(sc, modelName) }
  }
}

Upvotes: 2

Related Questions