Eduardo Liendo
Eduardo Liendo

Reputation: 151

spark-submit with Mahout error on cluster mode (Scala/java)

I'm trying to build a basic recomender with Spark and Mahout on Scala. I use the follow mahout repo to compile mahout with scala 2.11 and spark 2.1.2 mahout_fork

To execute my code I use spark-submit and it run fine when I put --master local but when I try to run on a cluster with --master spark://vagrant-ubuntu-trusty-64:7077 it fails always with the same error.

Command (Run Fine):

/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
--master local \
target/scala-2.11/recomender-0.0.1.jar

Command (ERROR):

/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
--master spark://vagrant-ubuntu-trusty-64:7077 \
target/scala-2.11/recomender-0.0.1.jar

Dependencies on Build.sbt :

name := "recomender"
version := "0.0.1"
scalaVersion := "2.11.11"
val mahoutVersion = "0.13.0"
val sparkVersion = "2.1.2"

libraryDependencies ++= {
  Seq(
    "org.apache.spark"        %% "spark-core" % sparkVersion % "provided" ,
    "org.apache.spark"        %% "spark-sql" % sparkVersion % "provided" ,
    "org.apache.spark"        %% "spark-mllib" % sparkVersion % "provided",
    /* Mahout */
    "org.apache.mahout" %% "mahout-spark" % mahoutVersion
      exclude("org.apache.spark", "spark-core_2.11")
      exclude("org.apache.spark", "spark-sql_2.11"),
    "org.apache.mahout" %% "mahout-math-scala" % mahoutVersion,
    "org.apache.mahout" % "mahout-math" % mahoutVersion,
    "org.apache.mahout" % "mahout-hdfs" % mahoutVersion
      exclude("com.thoughtworks.xstream", "xstream")
      exclude("org.apache.hadoop", "hadoop-client")
  )
}

resolvers += "Local Repository" at "file://"+baseDirectory.value / "repo"
resolvers += Resolver.mavenLocal

// Tests Configuration
run in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))
runMain in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))
fork := true
javaOptions := Seq("-Dspark.master=local")
outputStrategy := Some(StdoutOutput)

/* without this explicit merge strategy code you get a lot of noise from sbt-assembly
   complaining about not being able to dedup files */
assemblyMergeStrategy in assembly := {
  case PathList("org","aopalliance", xs @ _*) => MergeStrategy.last
  case PathList("javax", "inject", xs @ _*) => MergeStrategy.last
  case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
  case PathList("javax", "activation", xs @ _*) => MergeStrategy.last
  case PathList("org", "apache", xs @ _*) => MergeStrategy.last
  case PathList("com", "google", xs @ _*) => MergeStrategy.last
  case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
  case PathList("com", "codahale", xs @ _*) => MergeStrategy.last
  case PathList("com", "yammer", xs @ _*) => MergeStrategy.last
  case "about.html" => MergeStrategy.rename
  case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
  case "META-INF/mailcap" => MergeStrategy.last
  case "META-INF/mimetypes.default" => MergeStrategy.last
  case "plugin.properties" => MergeStrategy.last
  case "log4j.properties" => MergeStrategy.last
  case "overview.html" => MergeStrategy.last
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

/* including scala bloats your assembly jar unnecessarily, and may interfere with
   spark runtime */
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

/* you need to be able to undo the "provided" annotation on the deps when running your spark
   programs locally i.e. from sbt; this bit reincludes the full classpaths in the compile and run tasks. */
fullClasspath in Runtime := (fullClasspath in (Compile, run)).value

/* Defining format of assembly JAR */
assemblyJarName in assembly := name.value + "-" + version.value +".jar"

Main class:

package com.reco

import org.apache.mahout.sparkbindings.SparkDistributedContext
import org.apache.mahout.sparkbindings._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object GenerateIndicator {

  def main(args: Array[String]) {
    try {

      // Create spark-conf
      val sparkConf = new SparkConf().setAppName("recomender")

      implicit val mahoutCtx: SparkDistributedContext = mahoutSparkContext(
        masterUrl = sparkConf.get("spark.master"),
        appName = "recomender",
        sparkConf = sparkConf,
        // addMahoutJars = true,
        addMahoutJars = false
      )

      implicit val sdc: SparkDistributedContext = sc2sdc(mahoutCtx)

      val sparkSession = SparkSession
        .builder()
        .appName("recomender")
        .config(sparkConf)
        .getOrCreate()

      val lines = returnData()

      val linesRdd = sdc.sc.parallelize(lines)

      println("...Collecting...")

      linesRdd.collect().foreach( item => {  // ERROR HERE! on collect()
        println(item)
      })

      // Destroy Spark Session
      sparkSession.stop()
      sparkSession.close()

    } catch {
      case e: Exception =>
        println(e)
        throw new Exception(e)

    }

  }

  def returnData() : Array[String] = {
    val lines = Array(
      "17,Action",
      "17,Comedy",
      "17,Crime",
      "17,Horror",
      "17,Thriller",
      "12,Crime",
      "12,Thriller",
      "16,Comedy",
      "16,Romance",
      "20,Drama",
      "20,Romance",
      "7,Drama",
      "7,Sci-Fi",
      // ... more lines in array ...
      "1680,Drama",
      "1680,Romance",
      "1681,Comedy"
    )
    lines
  }

}

Error::

18/08/01 14:18:53 INFO DAGScheduler: ResultStage 0 (collect at GenerateIndicator.scala:38) failed in 3.551 s due to Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, 10.0.2.15, executor 0): java.lang.IllegalStateException: unread block data
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
18/08/01 14:18:53 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 7) on 10.0.2.15, executor 0: java.lang.IllegalStateException (unread block data) [duplicate 7]
18/08/01 14:18:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
18/08/01 14:18:53 INFO DAGScheduler: Job 0 failed: collect at GenerateIndicator.scala:38, took 5.265593 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, 10.0.2.15, executor 0): java.lang.IllegalStateException: unread block data
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
Exception in thread "main" java.lang.Exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, 10.0.2.15, executor 0): java.lang.IllegalStateException: unread block data
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at com.reco.GenerateIndicator$.main(GenerateIndicator.scala:49)
    at com.reco.GenerateIndicator.main(GenerateIndicator.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:744)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, 10.0.2.15, executor 0): java.lang.IllegalStateException: unread block data
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1968)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
    at com.reco.GenerateIndicator$.main(GenerateIndicator.scala:38)
    ... 10 more
Caused by: java.lang.IllegalStateException: unread block data
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

/* Edited */

The error is descripted on the F.A.Q of mahout web page in the second question Spark Bindings FAQ , so I try to build the mahout by my self for scala 2.11 and spark 2.1.2 as it's show on the mahout web Building Mahout from Source. But it throw the follow error during the mvn install or mvn package

Command:

mvn clean install -Phadoop2 -Dspark.version=2.1.2 -Dscala.version=2.11.11 -Dscala.compat.version=2.11 -Dmaven.test.skip=true

Or with a lower version of spark

mvn clean install -Phadoop2 -Dspark.version=2.1.0 -Dscala.version=2.11.11 -Dscala.compat.version=2.11 -Dmaven.test.skip=true

ERROR on compile:

[INFO] -------------------< org.apache.mahout:mahout-hdfs >--------------------
[INFO] Building Mahout HDFS 0.13.1-SNAPSHOT                              [4/10]
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO] 
[INFO] Mahout Build Tools ................................. SUCCESS [  4.121 s]
[INFO] Apache Mahout 0.13.1-SNAPSHOT ...................... SUCCESS [  0.148 s]
[INFO] Mahout Math ........................................ SUCCESS [ 11.090 s]
[INFO] Mahout HDFS ........................................ FAILURE [  0.321 s]
[INFO] Mahout Map-Reduce .................................. SKIPPED
[INFO] Mahout Integration ................................. SKIPPED
[INFO] Mahout Examples .................................... SKIPPED
[INFO] Mahout Math Scala bindings ......................... SKIPPED
[INFO] Mahout Spark bindings .............................. SKIPPED
[INFO] Mahout H2O backend 0.13.1-SNAPSHOT ................. SKIPPED
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 16.343 s
[INFO] Finished at: 2018-08-02T14:12:35+02:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project mahout-hdfs: Could not resolve dependencies for project org.apache.mahout:mahout-hdfs:jar:0.13.1-SNAPSHOT: Failure to find org.apache.mahout:mahout-math:jar:tests:0.13.1-SNAPSHOT in http://repository.apache.org/snapshots was cached in the local repository, resolution will not be reattempted until the update interval of apache.snapshots has elapsed or updates are forced -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
[ERROR] 
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn <goals> -rf :mahout-hdfs

The mahout master repository doesn't work

Upvotes: 1

Views: 298

Answers (1)

pferrel
pferrel

Reputation: 5702

That repo is only a subset of Mahout. It is documented for use in a PredictionIO recommender template called the Universal Recommender. That said, to use it outside the template you must also set the serializer to recognize internal Mahout data structures. It may be this is the problem above regarding serialization. We do that in the UR by setting spark config with the following key/value pairs (shown in JSON):

"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.kryo.registrator": "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator",
"spark.kryo.referenceTracking": "false",
"spark.kryoserializer.buffer": "300m",

Try passing these to spark-submit or putting them into the context with your driver code.

You have added a build problem above, please try to stick to one problem per SO question.

I would suggest that you use the binaries we host on GitHub with SBT via something like:

val mahoutVersion = "0.13.0"

val sparkVersion = "2.1.1"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.1.1" % "provided",
  "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided",
  "org.xerial.snappy" % "snappy-java" % "1.1.1.7",
  // Mahout's Spark libs. They're custom compiled for Scala 2.11
  "org.apache.mahout" %% "mahout-math-scala" % mahoutVersion,
  "org.apache.mahout" %% "mahout-spark" % mahoutVersion
    exclude("org.apache.spark", "spark-core_2.11"),
  "org.apache.mahout"  % "mahout-math" % mahoutVersion,
  "org.apache.mahout"  % "mahout-hdfs" % mahoutVersion
    exclude("com.thoughtworks.xstream", "xstream")
    exclude("org.apache.hadoop", "hadoop-client")
  // other external libs
)

resolvers += "Temp Scala 2.11 build of Mahout" at "https://github.com/actionml/mahout_2.11/raw/mvn-repo/"

You don't want to build that fork since it requires publishing only the modules needed for the Scala/Samsara part of Mahout to a specially formatted repo that is compatible with SBT.

Mahout people (including me) are working to release a version that supports SBT, Scala 2.11 and 2.12 as well as newer versions of Spark. It is running in the main branch of Apache and will be released soon. For now the above may get you going.

Upvotes: 0

Related Questions