Sadegh
Sadegh

Reputation: 412

Read From mongoDB in Scala

I want to create a standalone scala code that uses a custom setting to read from MongoDB using this code in MongoDB website.

When I run SBT package, I face some errors. I guess it is related to wrong creation method of SparkSession. Can you please give me a hint to fix it?

My Buid.sbt content

scalaVersion := "2.11.12"

libraryDependencies ++= Seq(
  "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.1",
  "org.apache.spark" %% "spark-core" % "2.4.1",
  "org.apache.spark" %% "spark-sql" % "2.4.1"
)

Firstapp.scala code

package com.mongodb
import org.apache.spark.sql.SparkSession
import com.mongodb.spark.config.{ReadConfig,WriteConfig}
import com.mongodb.spark.MongoSpark
import org.bson.Document

object FirstApp {
  def main(args: Array[String]) {

    val sc = SparkSession.builder()
    .master("local")
    .appName("MongoSparkConnectorIntro")
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
    .getOrCreate()

    val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
    val customRdd = MongoSpark.load(sc, readConfig)

    println(customRdd.count)
    println(customRdd.first.toJson)

 }
}

and the error after running sbt package

    value toJson is not a member of org.apache.spark.sql.Row
[error]     println(customRdd.first.toJson)
[error]                             ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 10 s, completed Jun 10, 2020 6:10:50 PM

EDIT1:

I tried the solution but it does not compile properly. The Buid.sbt content is the same as above. I changed SimpleApp.scala into:

import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.SparkSession

object FirstApp {
  def main(args: Array[String]) {

    val spark = SparkSession.builder()
        .master("local")
        .appName("MongoSparkConnectorIntro")
        .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
        .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
        .getOrCreate()
    val sc = spark.sparkContext

    val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
    val customRdd = MongoSpark.load(sc)
    println(customRdd.count())
    println(customRdd.first.toJson)

 }
}

The compilation result:

$ spark-submit   --class "FirstApp"   --master local[4]   target/scala-2.11/root-2_2.11-0.1.0-SNAPSHOT.jar 
20/06/12 07:09:53 WARN Utils: Your hostname, Project resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
20/06/12 07:09:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/06/12 07:09:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/06/12 07:09:54 INFO SparkContext: Running Spark version 2.4.5
20/06/12 07:09:54 INFO SparkContext: Submitted application: MongoSparkConnectorIntro
20/06/12 07:09:55 INFO SecurityManager: Changing view acls to: sadegh
20/06/12 07:09:55 INFO SecurityManager: Changing modify acls to: sadegh
20/06/12 07:09:55 INFO SecurityManager: Changing view acls groups to: 
20/06/12 07:09:55 INFO SecurityManager: Changing modify acls groups to: 
20/06/12 07:09:55 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(sadegh); groups with view permissions: Set(); users  with modify permissions: Set(sadegh); groups with modify permissions: Set()
20/06/12 07:09:55 INFO Utils: Successfully started service 'sparkDriver' on port 33031.
20/06/12 07:09:55 INFO SparkEnv: Registering MapOutputTracker
20/06/12 07:09:55 INFO SparkEnv: Registering BlockManagerMaster
20/06/12 07:09:55 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/06/12 07:09:55 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/06/12 07:09:55 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-7405e1be-08e8-4f58-b88e-b8f01f8fe87e
20/06/12 07:09:55 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
20/06/12 07:09:55 INFO SparkEnv: Registering OutputCommitCoordinator
20/06/12 07:09:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
20/06/12 07:09:55 INFO Utils: Successfully started service 'SparkUI' on port 4041.
20/06/12 07:09:56 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.0.2.15:4041
20/06/12 07:09:56 INFO SparkContext: Added JAR file:/Folder/target/scala-2.11/root-2_2.11-0.1.0-SNAPSHOT.jar at spark://10.0.2.15:33031/jars/root-2_2.11-0.1.0-SNAPSHOT.jar with timestamp 1591938596069
20/06/12 07:09:56 INFO Executor: Starting executor ID driver on host localhost
20/06/12 07:09:56 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42815.
20/06/12 07:09:56 INFO NettyBlockTransferService: Server created on 10.0.2.15:42815
20/06/12 07:09:56 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/06/12 07:09:56 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.0.2.15, 42815, None)
20/06/12 07:09:56 INFO BlockManagerMasterEndpoint: Registering block manager 10.0.2.15:42815 with 366.3 MB RAM, BlockManagerId(driver, 10.0.2.15, 42815, None)
20/06/12 07:09:56 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.0.2.15, 42815, None)
20/06/12 07:09:56 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.0.2.15, 42815, None)
Exception in thread "main" java.lang.NoClassDefFoundError: com/mongodb/spark/config/ReadConfig$
    at FirstApp$.main(SimpleApp.scala:16)
    at FirstApp.main(SimpleApp.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: com.mongodb.spark.config.ReadConfig$
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 14 more
20/06/12 07:09:56 INFO SparkContext: Invoking stop() from shutdown hook
20/06/12 07:09:56 INFO SparkUI: Stopped Spark web UI at http://10.0.2.15:4041
20/06/12 07:09:56 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/06/12 07:09:56 INFO MemoryStore: MemoryStore cleared
20/06/12 07:09:56 INFO BlockManager: BlockManager stopped
20/06/12 07:09:56 INFO BlockManagerMaster: BlockManagerMaster stopped
20/06/12 07:09:56 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/06/12 07:09:56 INFO SparkContext: Successfully stopped SparkContext
20/06/12 07:09:56 INFO ShutdownHookManager: Shutdown hook called
20/06/12 07:09:56 INFO ShutdownHookManager: Deleting directory /tmp/spark-7f90ac08-403c-4a3f-bb45-ea24a347c380
20/06/12 07:09:56 INFO ShutdownHookManager: Deleting directory /tmp/spark-78cb32aa-c6d1-4ba4-b94f-16d3761d181b

EDIT2:

I added .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1") to SimpleApp.scala but the error remains the same as EDIT1 section:

import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.SparkSession

object FirstApp {
  def main(args: Array[String]) {

    val spark = SparkSession.builder()
        .master("local")
        .appName("MongoSparkConnectorIntro")
        .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
        .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1")
        .getOrCreate()
    val sc = spark.sparkContext

    val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
    val customRdd = MongoSpark.load(sc)
    println(customRdd.count())
    println(customRdd.first.toJson)

 }
}

Upvotes: 5

Views: 1695

Answers (3)

Faaiz
Faaiz

Reputation: 685

I don't know if you were able to solve your issue, but this is how it worked for me. You need to provide --packages at the spark-submit as follows:

spark-submit --class "FirstApp" --master local[4] --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.1  target/scala-2.11/root-2_2.11-0.1.0-SNAPSHOT.jar

Upvotes: 0

koiralo
koiralo

Reputation: 23109

Here is the detail steps to create a Scala Project to read the data from MongoDB with Apache spark

You can create a project with IDE or manually with the following files included

  1. SparkMongo/project/plugins.sbt
  2. SparkMongo/src/main/scala/com/test/FirstMongoSparkApp.scala
  3. SparkMongo/build.sbt

project/plugins.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")

build.sbt

name := "SparkMongo"
version := "0.1"
scalaVersion := "2.11.12"

val sparkVersion = "2.4.1"
val mongoSparkVersion = "2.4.1"

libraryDependencies ++= Seq(
  "org.mongodb.spark" %% "mongo-spark-connector" %  mongoSparkVersion ,
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion
)

assemblyJarName in assembly := s"${name.value}_${scalaBinaryVersion.value}-${version.value}.jar"

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
}

SparkMongo/src/main/scala/com/test/FirstMongoSparkApp.scala

package com.test

import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.SparkSession

object FirstMongoSparkApp extends App {

  val spark = SparkSession.builder()
    .master("local")
    .appName("MongoSparkProject")
    .config("spark.mongodb.input.uri", "mongodb://localhost/test.cities")
    .config("spark.mongodb.output.uri", "mongodb://localhost/test.outputCities")
    .getOrCreate()

  import spark.implicits._

  val readConfig = ReadConfig(Map("collection" -> "cities", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(spark.sparkContext)))
  val customRdd = MongoSpark.load(spark.sparkContext, readConfig)

  customRdd.toDF().show(false)

}

Now you can perform sbt assembly will generate a jar file SparkMongo_2.11-0.1.jar

You can run the jar file as

spark-submit --class "com.test.FirstMongoSparkApp" --master "local" target/scala-2.11/SparkMongo_2.11-0.1.jar

To run without issues make sure you have the same version of spark as in the dependency, In this case 2.4.1 and mongoDB version 2.6+

Upvotes: 1

Boris Azanov
Boris Azanov

Reputation: 4481

I think your problem is you are trying to use SparkSession as SparkContext but they are not the same thing. If you replace sc to SparkContext everything will compile.

import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
    .master("local")
    .appName("MongoSparkConnectorIntro")
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
    .getOrCreate()
val sc = spark.sparkContext

val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val customRdd = MongoSpark.load(sc)
println(customRdd.count())
println(customRdd.first.toJson)

Upvotes: 0

Related Questions