Metadata
Metadata

Reputation: 2083

Filtering and selecting data from a DataFrame in Spark

I am working on a Spark-JDBC program I came up with the following code so far:

object PartitionRetrieval {
    var conf  = new SparkConf().setAppName("Spark-JDBC")
    val log   = LogManager.getLogger("Spark-JDBC Program")
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conFile       = "/home/hmusr/ReconTest/inputdir/testconnection.properties"
    val properties    = new Properties()
    properties.load(new FileInputStream(conFile))
    val connectionUrl = properties.getProperty("gpDevUrl")
    val devUserName   = properties.getProperty("devUserName")
    val devPassword   = properties.getProperty("devPassword")
    val driverClass   = properties.getProperty("gpDriverClass")
    val tableName     = "source.bank_accounts"
    try {
    Class.forName(driverClass).newInstance()
    } catch {
    case cnf: ClassNotFoundException =>
        log.error("Driver class: " + driverClass + " not found")
        System.exit(1)
    case e: Exception =>
        log.error("Exception: " + e.printStackTrace())
        System.exit(1)
    }
    def main(args: Array[String]): Unit = {
        val spark   = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().getOrCreate()
        val gpTable = spark.read.format("jdbc").option("url", connectionUrl)
                                                        .option("dbtable",tableName)
                                                        .option("user",devUserName)
                                                        .option("password",devPassword).load()
        val rc = gpTable.filter(gpTable("source_system_name")==="ORACLE").count()
        println("gpTable Count: " + rc)
    }
}

In the above code, will the statement:val gpTable = spark.read.format("jdbc").option("url", connectionUrl) dump the whole data of the table: bank_accounts into the DataFrame: gpTable and then DataFrame: rc gets the filtered data. I have this doubt as the table: bank_accounts is a very small table and it doesn't have an effect if it is loaded into memory as a dataframe as a whole. But in our production, there are tables with billions of records. In that case what is the recommended way to load data into a DataFrame using a JDBC connection ? Could anyone let me know the concept of Spark-Jdbc's entry point here ?

Upvotes: 1

Views: 1494

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35249

  • will the statement ... dump the whole data of the table: bank_accounts into the DataFrame: gpTable and then DataFrame: rc gets the filtered data.

    No. DataFrameReader is not eager. It only defines data bindings.

    Additionally, simple predicates, like trivial equality, checks are pushed to the source and only required columns should loaded when plan is executed.

    In the database log you should see a query similar to

     SELECT 1 FROM table WHERE source_system_name = 'ORACLE'
    
  • if it is loaded into memory as a dataframe as a whole.

    No. Spark doesn't load data in memory unless it instructed to (primarily cache) and even then it limits itself to the blocks that fit into available storage memory.

    During standard process it keep only the data that is required to compute the plan. For global plan memory footprint shouldn't depend on the amount of data.

  • In that case what is the recommended way to load data into a DataFrame using a JDBC connection ?

    Please check Partitioning in spark while reading from RDBMS via JDBC, Whats meaning of partitionColumn, lowerBound, upperBound, numPartitions parameters?, https://stackoverflow.com/a/45028675/8371915 for questions related to scalability.

    Additionally you can read Does spark predicate pushdown work with JDBC?

Upvotes: 2

Related Questions