omer khalid
omer khalid

Reputation: 895

How to Connect Spark SQL with My SQL Database Scala

Problem Statement:

Hi, I am a newbie to the Spark World. I want to query the MySQL Database and then load one table into the Spark. Then I want to apply some filter on the table using SQL Query. Once the result is filtered I want to return the result as JSON. All this we have to do from a standalone Scala base application.

I am struggling to initialize the Spark Context and getting an error. I know I am missing some piece of information.

Can Somebody have a look on the code and tell me what I need to do.

Code:

import application.ApplicationConstants
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SparkSession, Dataset, Row, Column, SQLContext}

var sc: SparkContext = null

    val sparkSession = SparkSession.builder().master("spark://10.62.10.71:7077")
      .config("format","jdbc")
      .config("url","jdbc:mysql://localhost:3306/test")
      .config("user","root")
      .config("password","")
      .appName("MySQLSparkConnector")
      .getOrCreate()

    var conf = new SparkConf()
    conf.setAppName("MongoSparkConnectorIntro")
      .setMaster("local")
      .set("format", "jdbc")
      .set("url","jdbc:mysql://localhost:3306/test")
      .set("user","root")
      .set("password","")

    sc = new SparkContext(conf)
val connectionProperties = new java.util.Properties
    connectionProperties.put("user", username)
    connectionProperties.put("password", password)
     val customDF2 = sparkSession.read.jdbc(url,"employee",connectionProperties)

    println("program ended")

Error:

Following is the error that I am getting:

64564 [main] ERROR org.apache.spark.SparkContext - Error initializing SparkContext.
java.lang.NullPointerException
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:560)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
    at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$5(SparkSession.scala:935)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
    at manager.SparkSQLMySQLDBConnector$.main(SparkSQLMySQLDBConnector.scala:21)
    at manager.SparkSQLMySQLDBConnector.main(SparkSQLMySQLDBConnector.scala)
64566 [main] INFO org.apache.spark.SparkContext - SparkContext already stopped.
Exception in thread "main" java.lang.NullPointerException
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:560)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
    at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$5(SparkSession.scala:935)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
    at manager.SparkSQLMySQLDBConnector$.main(SparkSQLMySQLDBConnector.scala:21)
    at manager.SparkSQLMySQLDBConnector.main(SparkSQLMySQLDBConnector.scala)

P.S: If anybody can give me any link or tutorial that is showing the similar scenario with Scala.

Versions:

Spark: 2.4.0 Scala: 2.12.8 MySQL Connector Jar: 8.0.13

Upvotes: 0

Views: 1827

Answers (1)

koiralo
koiralo

Reputation: 23119

I think you are messing around creating spark context and configs to connect MYSQL

IF you are using spark 2.0+ only use SparkSession as a entry-point as

val spark = SparkSession.builder().master("local[*]").appName("Test").getOrCreate

//Add Properties asbelow  
val prop = new java.util.Properties()
prop.put("user", "user")
prop.put("password", "password")
val url = "jdbc:mysql://host:port/dbName"

Now read the table with as dataframe

val df = spark.read.jdbc(url, "tableName", prop)

To access sparkContext and sqlContext you can access from SparkSession as

val sc = spark.sparkContext

val sqlContext = spark.sqlContext

Make sure you have mysql-connector-java jar in classpath, Add the dependency to your pom.xml or built.sbt

Hope this helps!

Upvotes: 1

Related Questions