Reputation: 151
I'm trying to build a basic recomender with Spark and Mahout on Scala. I use the follow mahout repo to compile mahout with scala 2.11 and spark 2.1.2 mahout_fork
To execute my code I use spark-submit and it run fine when I put --master local
but when I try to run on a cluster with --master spark://vagrant-ubuntu-trusty-64:7077
it fails always with the same error.
Command (Run Fine):
/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
--master local \
target/scala-2.11/recomender-0.0.1.jar
Command (ERROR):
/opt/spark/bin/spark-submit \
--class 'com.reco.GenerateIndicator' \
--name recomender \
--master spark://vagrant-ubuntu-trusty-64:7077 \
target/scala-2.11/recomender-0.0.1.jar
Dependencies on Build.sbt :
name := "recomender"
version := "0.0.1"
scalaVersion := "2.11.11"
val mahoutVersion = "0.13.0"
val sparkVersion = "2.1.2"
libraryDependencies ++= {
Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided" ,
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided" ,
"org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
/* Mahout */
"org.apache.mahout" %% "mahout-spark" % mahoutVersion
exclude("org.apache.spark", "spark-core_2.11")
exclude("org.apache.spark", "spark-sql_2.11"),
"org.apache.mahout" %% "mahout-math-scala" % mahoutVersion,
"org.apache.mahout" % "mahout-math" % mahoutVersion,
"org.apache.mahout" % "mahout-hdfs" % mahoutVersion
exclude("com.thoughtworks.xstream", "xstream")
exclude("org.apache.hadoop", "hadoop-client")
)
}
resolvers += "Local Repository" at "file://"+baseDirectory.value / "repo"
resolvers += Resolver.mavenLocal
// Tests Configuration
run in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))
runMain in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))
fork := true
javaOptions := Seq("-Dspark.master=local")
outputStrategy := Some(StdoutOutput)
/* without this explicit merge strategy code you get a lot of noise from sbt-assembly
complaining about not being able to dedup files */
assemblyMergeStrategy in assembly := {
case PathList("org","aopalliance", xs @ _*) => MergeStrategy.last
case PathList("javax", "inject", xs @ _*) => MergeStrategy.last
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last
case PathList("javax", "activation", xs @ _*) => MergeStrategy.last
case PathList("org", "apache", xs @ _*) => MergeStrategy.last
case PathList("com", "google", xs @ _*) => MergeStrategy.last
case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
case PathList("com", "codahale", xs @ _*) => MergeStrategy.last
case PathList("com", "yammer", xs @ _*) => MergeStrategy.last
case "about.html" => MergeStrategy.rename
case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
case "META-INF/mailcap" => MergeStrategy.last
case "META-INF/mimetypes.default" => MergeStrategy.last
case "plugin.properties" => MergeStrategy.last
case "log4j.properties" => MergeStrategy.last
case "overview.html" => MergeStrategy.last
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
/* including scala bloats your assembly jar unnecessarily, and may interfere with
spark runtime */
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
/* you need to be able to undo the "provided" annotation on the deps when running your spark
programs locally i.e. from sbt; this bit reincludes the full classpaths in the compile and run tasks. */
fullClasspath in Runtime := (fullClasspath in (Compile, run)).value
/* Defining format of assembly JAR */
assemblyJarName in assembly := name.value + "-" + version.value +".jar"
Main class:
package com.reco
import org.apache.mahout.sparkbindings.SparkDistributedContext
import org.apache.mahout.sparkbindings._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object GenerateIndicator {
def main(args: Array[String]) {
try {
// Create spark-conf
val sparkConf = new SparkConf().setAppName("recomender")
implicit val mahoutCtx: SparkDistributedContext = mahoutSparkContext(
masterUrl = sparkConf.get("spark.master"),
appName = "recomender",
sparkConf = sparkConf,
// addMahoutJars = true,
addMahoutJars = false
)
implicit val sdc: SparkDistributedContext = sc2sdc(mahoutCtx)
val sparkSession = SparkSession
.builder()
.appName("recomender")
.config(sparkConf)
.getOrCreate()
val lines = returnData()
val linesRdd = sdc.sc.parallelize(lines)
println("...Collecting...")
linesRdd.collect().foreach( item => { // ERROR HERE! on collect()
println(item)
})
// Destroy Spark Session
sparkSession.stop()
sparkSession.close()
} catch {
case e: Exception =>
println(e)
throw new Exception(e)
}
}
def returnData() : Array[String] = {
val lines = Array(
"17,Action",
"17,Comedy",
"17,Crime",
"17,Horror",
"17,Thriller",
"12,Crime",
"12,Thriller",
"16,Comedy",
"16,Romance",
"20,Drama",
"20,Romance",
"7,Drama",
"7,Sci-Fi",
// ... more lines in array ...
"1680,Drama",
"1680,Romance",
"1681,Comedy"
)
lines
}
}
Error::
18/08/01 14:18:53 INFO DAGScheduler: ResultStage 0 (collect at GenerateIndicator.scala:38) failed in 3.551 s due to Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, 10.0.2.15, executor 0): java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
18/08/01 14:18:53 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 7) on 10.0.2.15, executor 0: java.lang.IllegalStateException (unread block data) [duplicate 7]
18/08/01 14:18:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
18/08/01 14:18:53 INFO DAGScheduler: Job 0 failed: collect at GenerateIndicator.scala:38, took 5.265593 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, 10.0.2.15, executor 0): java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
Exception in thread "main" java.lang.Exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, 10.0.2.15, executor 0): java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at com.reco.GenerateIndicator$.main(GenerateIndicator.scala:49)
at com.reco.GenerateIndicator.main(GenerateIndicator.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:744)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, 10.0.2.15, executor 0): java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1928)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1941)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1968)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at com.reco.GenerateIndicator$.main(GenerateIndicator.scala:38)
... 10 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
/* Edited */
The error is descripted on the F.A.Q of mahout web page in the second question Spark Bindings FAQ , so I try to build the mahout by my self for scala 2.11 and spark 2.1.2 as it's show on the mahout web Building Mahout from Source. But it throw the follow error during the mvn install
or mvn package
Command:
mvn clean install -Phadoop2 -Dspark.version=2.1.2 -Dscala.version=2.11.11 -Dscala.compat.version=2.11 -Dmaven.test.skip=true
Or with a lower version of spark
mvn clean install -Phadoop2 -Dspark.version=2.1.0 -Dscala.version=2.11.11 -Dscala.compat.version=2.11 -Dmaven.test.skip=true
ERROR on compile:
[INFO] -------------------< org.apache.mahout:mahout-hdfs >--------------------
[INFO] Building Mahout HDFS 0.13.1-SNAPSHOT [4/10]
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Mahout Build Tools ................................. SUCCESS [ 4.121 s]
[INFO] Apache Mahout 0.13.1-SNAPSHOT ...................... SUCCESS [ 0.148 s]
[INFO] Mahout Math ........................................ SUCCESS [ 11.090 s]
[INFO] Mahout HDFS ........................................ FAILURE [ 0.321 s]
[INFO] Mahout Map-Reduce .................................. SKIPPED
[INFO] Mahout Integration ................................. SKIPPED
[INFO] Mahout Examples .................................... SKIPPED
[INFO] Mahout Math Scala bindings ......................... SKIPPED
[INFO] Mahout Spark bindings .............................. SKIPPED
[INFO] Mahout H2O backend 0.13.1-SNAPSHOT ................. SKIPPED
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 16.343 s
[INFO] Finished at: 2018-08-02T14:12:35+02:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project mahout-hdfs: Could not resolve dependencies for project org.apache.mahout:mahout-hdfs:jar:0.13.1-SNAPSHOT: Failure to find org.apache.mahout:mahout-math:jar:tests:0.13.1-SNAPSHOT in http://repository.apache.org/snapshots was cached in the local repository, resolution will not be reattempted until the update interval of apache.snapshots has elapsed or updates are forced -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR] mvn <goals> -rf :mahout-hdfs
The mahout master repository doesn't work
Upvotes: 1
Views: 298
Reputation: 5702
That repo is only a subset of Mahout. It is documented for use in a PredictionIO recommender template called the Universal Recommender. That said, to use it outside the template you must also set the serializer to recognize internal Mahout data structures. It may be this is the problem above regarding serialization. We do that in the UR by setting spark config with the following key/value pairs (shown in JSON):
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.kryo.registrator": "org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator",
"spark.kryo.referenceTracking": "false",
"spark.kryoserializer.buffer": "300m",
Try passing these to spark-submit or putting them into the context with your driver code.
You have added a build problem above, please try to stick to one problem per SO question.
I would suggest that you use the binaries we host on GitHub with SBT via something like:
val mahoutVersion = "0.13.0"
val sparkVersion = "2.1.1"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.1.1" % "provided",
"org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided",
"org.xerial.snappy" % "snappy-java" % "1.1.1.7",
// Mahout's Spark libs. They're custom compiled for Scala 2.11
"org.apache.mahout" %% "mahout-math-scala" % mahoutVersion,
"org.apache.mahout" %% "mahout-spark" % mahoutVersion
exclude("org.apache.spark", "spark-core_2.11"),
"org.apache.mahout" % "mahout-math" % mahoutVersion,
"org.apache.mahout" % "mahout-hdfs" % mahoutVersion
exclude("com.thoughtworks.xstream", "xstream")
exclude("org.apache.hadoop", "hadoop-client")
// other external libs
)
resolvers += "Temp Scala 2.11 build of Mahout" at "https://github.com/actionml/mahout_2.11/raw/mvn-repo/"
You don't want to build that fork since it requires publishing only the modules needed for the Scala/Samsara part of Mahout to a specially formatted repo that is compatible with SBT.
Mahout people (including me) are working to release a version that supports SBT, Scala 2.11 and 2.12 as well as newer versions of Spark. It is running in the main branch of Apache and will be released soon. For now the above may get you going.
Upvotes: 0