Sai Mammahi
Sai Mammahi

Reputation: 227

How to create table in mysql database using apache spark

I am trying to create a spark application which is useful to create, read, write and update MySQL data. So, is there any way to create a MySQL table using Spark?

Below I have a Scala-JDBC code that creates a table in MySQL database. How can I do this through Spark?

package SparkMysqlJdbcConnectivity

import org.apache.spark.sql.SparkSession
import java.util.Properties
import java.lang.Class
import java.sql.Connection
import java.sql.DriverManager

object MysqlSparkJdbcProgram {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("MysqlJDBC Connections")
      .master("local[*]")
      .getOrCreate()

    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost:3306/world"
    val operationtype = "create table"
    val tablename = "country"
    val tablename2 = "state"

    val connectionProperties = new Properties()

    connectionProperties.put("user", "root")
    connectionProperties.put("password", "root")

    val jdbcDf = spark.read.jdbc(url, s"${tablename}", connectionProperties)

    operationtype.trim() match {
      case "create table" => {
       // Class.forName(driver)
        try{
          val con:Connection = DriverManager.getConnection(url,connectionProperties)
          val result = con.prepareStatement(s"create table ${tablename2} (name varchar(255), country varchar(255))").execute()
          println(result)
          if(result) println("table creation is unsucessful") else println("table creation is unsucessful")
        }
      }

      case "read table" => {

        val jdbcDf = spark.read.jdbc("jdbc:mysql://localhost:3306/world", s"${tablename}", connectionProperties)
        jdbcDf.show()
      }

      case "write table" => {}

      case "drop table"  => {}

    }

  }

}

Upvotes: 2

Views: 6695

Answers (1)

Ravikumar
Ravikumar

Reputation: 1131

The tables will be created automatically when you write the jdbcDf dataframe.

jdbcDf
 .write
 .jdbc("jdbc:mysql://localhost:3306/world", s"${tablename}", connectionProperties)

In case if you want to specify the table schema,

jdbcDf
 .write
 .option("createTableColumnTypes", "name VARCHAR(500), col1 VARCHAR(1024), col3 int")
 .jdbc("jdbc:mysql://localhost:3306/world", s"${tablename}", connectionProperties)

Upvotes: 6

Related Questions