Rudresh Ajgaonkar
Rudresh Ajgaonkar

Reputation: 790

AWS Glue: How to read jdbc source via spark object in SCALA.

I am trying to do the following.

Read the data from a database via jdbc. In pyspark, i was able to do so using the following syntax.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from datetime import datetime
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
val finalDataFrame = sparknew.read.format("jdbc").option("url", "").option("dbtable", "").option("driver", "oracle.jdbc.OracleDriver").option("user", "").option("password", "").load()

Now i want to do a similar thing in SCALA.

I tried the following:

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.DynamicFrame
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.DataFrame


object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    val sparknew = glueContext.getSparkSession

    val df_fileInterfacemap = sparknew.read.format("jdbc").option("url", "<url>").option("dbtable", "tablename").option("nullValue", "null").option("user", "<password>").option("password", "<password>").load()
    val dynamicframe_new = DynamicFrame(df_fileInterfacemap, glueContext)
    val datasink2 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions("""{"path": "s3://<S3 Path>"}"""), transformationContext = "datasink2", format = "csv").writeDynamicFrame(dynamicframe_new)
    Job.commit()
  }
}

But i got the following error.

java.sql.SQLRecoverableException IO Error: The Network Adapter could not establish the connection

I Dont want to use crawler not getSource method provided by glue api. Any help would be appreciated.

Upvotes: 1

Views: 2698

Answers (1)

Rudresh Ajgaonkar
Rudresh Ajgaonkar

Reputation: 790

I was able to do the task using the following code :

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    val sparknew = glueContext.getSparkSession

    val props = new Properties()
    props.setProperty("driver", "oracle.jdbc.driver.OracleDriver")
    props.setProperty("user", "<USERNAME>")
    props.setProperty("password", "<PASSWORD>")
    props.setProperty("nullValue", "null")
    val url = "jdbc:oracle:thin:@HOSTNAME:PORT:INSTANCENAME"
    val table = "SCHEMA.TABLE"
    val df_fileInterfacemap = sparknew.read.jdbc(url, table, props)

Hope this would help someone.

Upvotes: 3

Related Questions