Reputation: 412
I want to create a standalone scala code that uses a custom setting to read from MongoDB using this code in MongoDB website.
When I run SBT package, I face some errors. I guess it is related to wrong creation method of SparkSession. Can you please give me a hint to fix it?
My Buid.sbt
content
scalaVersion := "2.11.12"
libraryDependencies ++= Seq(
"org.mongodb.spark" %% "mongo-spark-connector" % "2.4.1",
"org.apache.spark" %% "spark-core" % "2.4.1",
"org.apache.spark" %% "spark-sql" % "2.4.1"
)
Firstapp.scala
code
package com.mongodb
import org.apache.spark.sql.SparkSession
import com.mongodb.spark.config.{ReadConfig,WriteConfig}
import com.mongodb.spark.MongoSpark
import org.bson.Document
object FirstApp {
def main(args: Array[String]) {
val sc = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate()
val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val customRdd = MongoSpark.load(sc, readConfig)
println(customRdd.count)
println(customRdd.first.toJson)
}
}
and the error after running sbt package
value toJson is not a member of org.apache.spark.sql.Row
[error] println(customRdd.first.toJson)
[error] ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 10 s, completed Jun 10, 2020 6:10:50 PM
EDIT1:
I tried the solution but it does not compile properly. The Buid.sbt
content is the same as above. I changed SimpleApp.scala
into:
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.SparkSession
object FirstApp {
def main(args: Array[String]) {
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate()
val sc = spark.sparkContext
val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val customRdd = MongoSpark.load(sc)
println(customRdd.count())
println(customRdd.first.toJson)
}
}
The compilation result:
$ spark-submit --class "FirstApp" --master local[4] target/scala-2.11/root-2_2.11-0.1.0-SNAPSHOT.jar
20/06/12 07:09:53 WARN Utils: Your hostname, Project resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
20/06/12 07:09:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/06/12 07:09:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/06/12 07:09:54 INFO SparkContext: Running Spark version 2.4.5
20/06/12 07:09:54 INFO SparkContext: Submitted application: MongoSparkConnectorIntro
20/06/12 07:09:55 INFO SecurityManager: Changing view acls to: sadegh
20/06/12 07:09:55 INFO SecurityManager: Changing modify acls to: sadegh
20/06/12 07:09:55 INFO SecurityManager: Changing view acls groups to:
20/06/12 07:09:55 INFO SecurityManager: Changing modify acls groups to:
20/06/12 07:09:55 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(sadegh); groups with view permissions: Set(); users with modify permissions: Set(sadegh); groups with modify permissions: Set()
20/06/12 07:09:55 INFO Utils: Successfully started service 'sparkDriver' on port 33031.
20/06/12 07:09:55 INFO SparkEnv: Registering MapOutputTracker
20/06/12 07:09:55 INFO SparkEnv: Registering BlockManagerMaster
20/06/12 07:09:55 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/06/12 07:09:55 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/06/12 07:09:55 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-7405e1be-08e8-4f58-b88e-b8f01f8fe87e
20/06/12 07:09:55 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
20/06/12 07:09:55 INFO SparkEnv: Registering OutputCommitCoordinator
20/06/12 07:09:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
20/06/12 07:09:55 INFO Utils: Successfully started service 'SparkUI' on port 4041.
20/06/12 07:09:56 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.0.2.15:4041
20/06/12 07:09:56 INFO SparkContext: Added JAR file:/Folder/target/scala-2.11/root-2_2.11-0.1.0-SNAPSHOT.jar at spark://10.0.2.15:33031/jars/root-2_2.11-0.1.0-SNAPSHOT.jar with timestamp 1591938596069
20/06/12 07:09:56 INFO Executor: Starting executor ID driver on host localhost
20/06/12 07:09:56 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42815.
20/06/12 07:09:56 INFO NettyBlockTransferService: Server created on 10.0.2.15:42815
20/06/12 07:09:56 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/06/12 07:09:56 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.0.2.15, 42815, None)
20/06/12 07:09:56 INFO BlockManagerMasterEndpoint: Registering block manager 10.0.2.15:42815 with 366.3 MB RAM, BlockManagerId(driver, 10.0.2.15, 42815, None)
20/06/12 07:09:56 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.0.2.15, 42815, None)
20/06/12 07:09:56 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.0.2.15, 42815, None)
Exception in thread "main" java.lang.NoClassDefFoundError: com/mongodb/spark/config/ReadConfig$
at FirstApp$.main(SimpleApp.scala:16)
at FirstApp.main(SimpleApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: com.mongodb.spark.config.ReadConfig$
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 14 more
20/06/12 07:09:56 INFO SparkContext: Invoking stop() from shutdown hook
20/06/12 07:09:56 INFO SparkUI: Stopped Spark web UI at http://10.0.2.15:4041
20/06/12 07:09:56 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/06/12 07:09:56 INFO MemoryStore: MemoryStore cleared
20/06/12 07:09:56 INFO BlockManager: BlockManager stopped
20/06/12 07:09:56 INFO BlockManagerMaster: BlockManagerMaster stopped
20/06/12 07:09:56 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/06/12 07:09:56 INFO SparkContext: Successfully stopped SparkContext
20/06/12 07:09:56 INFO ShutdownHookManager: Shutdown hook called
20/06/12 07:09:56 INFO ShutdownHookManager: Deleting directory /tmp/spark-7f90ac08-403c-4a3f-bb45-ea24a347c380
20/06/12 07:09:56 INFO ShutdownHookManager: Deleting directory /tmp/spark-78cb32aa-c6d1-4ba4-b94f-16d3761d181b
EDIT2:
I added .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1")
to SimpleApp.scala
but the error remains the same as EDIT1 section:
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.SparkSession
object FirstApp {
def main(args: Array[String]) {
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1")
.getOrCreate()
val sc = spark.sparkContext
val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val customRdd = MongoSpark.load(sc)
println(customRdd.count())
println(customRdd.first.toJson)
}
}
Upvotes: 5
Views: 1695
Reputation: 685
I don't know if you were able to solve your issue, but this is how it worked for me. You need to provide --packages
at the spark-submit
as follows:
spark-submit --class "FirstApp" --master local[4] --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.1 target/scala-2.11/root-2_2.11-0.1.0-SNAPSHOT.jar
Upvotes: 0
Reputation: 23109
Here is the detail steps to create a Scala Project to read the data from MongoDB with Apache spark
You can create a project with IDE or manually with the following files included
project/plugins.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")
build.sbt
name := "SparkMongo"
version := "0.1"
scalaVersion := "2.11.12"
val sparkVersion = "2.4.1"
val mongoSparkVersion = "2.4.1"
libraryDependencies ++= Seq(
"org.mongodb.spark" %% "mongo-spark-connector" % mongoSparkVersion ,
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion
)
assemblyJarName in assembly := s"${name.value}_${scalaBinaryVersion.value}-${version.value}.jar"
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
SparkMongo/src/main/scala/com/test/FirstMongoSparkApp.scala
package com.test
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.SparkSession
object FirstMongoSparkApp extends App {
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkProject")
.config("spark.mongodb.input.uri", "mongodb://localhost/test.cities")
.config("spark.mongodb.output.uri", "mongodb://localhost/test.outputCities")
.getOrCreate()
import spark.implicits._
val readConfig = ReadConfig(Map("collection" -> "cities", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(spark.sparkContext)))
val customRdd = MongoSpark.load(spark.sparkContext, readConfig)
customRdd.toDF().show(false)
}
Now you can perform sbt assembly
will generate a jar file SparkMongo_2.11-0.1.jar
You can run the jar file as
spark-submit --class "com.test.FirstMongoSparkApp" --master "local" target/scala-2.11/SparkMongo_2.11-0.1.jar
To run without issues make sure you have the same version of spark as in the dependency, In this case 2.4.1 and mongoDB version 2.6+
Upvotes: 1
Reputation: 4481
I think your problem is you are trying to use SparkSession
as SparkContext
but they are not the same thing. If you replace sc
to SparkContext
everything will compile.
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate()
val sc = spark.sparkContext
val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val customRdd = MongoSpark.load(sc)
println(customRdd.count())
println(customRdd.first.toJson)
Upvotes: 0