nilesh1212
nilesh1212

Reputation: 1655

Apache Spark JDBC DataFrame count issue

I am using Spark JDBC to read data from MS SQL database, but I am getting some weird results.

For example, below is my code to read records from my MS SQL database. Please note the tables from which I am reading data is continuously getting inserted with records.

 //Extract Data from JDBC source
    val jdbcTable = sqlContext.read.format("jdbc").options(
      Map(
        "url" -> jdcbUrl,
        "driver" -> "net.sourceforge.jtds.jdbc.Driver",
        "dbtable" ->
          s"(SELECT COLUMNS WITH INNER JOINS WHERE tbl.ID > ${lastExtractUnixTime}) as t"))
       .load

     println(s"STEP 1] JDBC RECORDS COUNTS ---> ${jdbcTable.count()}")

    val updateJdbcDF = jdbcTable
      .withColumn("ID-COL1", trim($"COl1"))
      .withColumn("ID-COL1", trim($"COl2"))

   println(s"STEP 2] UPDATE JDBC RECORDS COUNTS ---> ${updateJdbcDF.count()}")

I get 2 different count values each time I run my program, for example I always get ${updateJdbcDF.count()} count > ${jdbcTable.count()}.

Can somebody explain me why is this happening? This is creating a lot of issues in my use case. How to restrict count of jdbcTable DataFrame after it is created. I tried jdbcTable.cache() but no luck.

The records just gets bigger and bigger when I use any operation on other dataframe derived from jdbcTable DataFrame. Does jdbcTable dataframe call every time I use any dataframe derived from jdbcTable dataframe.

Upvotes: 0

Views: 1643

Answers (1)

nilesh1212
nilesh1212

Reputation: 1655

I was able to fix this issue by applying jdbcTable.cache(), Now any DF derived from jdbcTable dataframe does not give me a higher count than jdbcTable.count(). All calculations are OK now. Thanks for the explanation @GPI

//Extract Data from JDBC source
    val jdbcTable = sqlContext.read.format("jdbc").options(
      Map(
        "url" -> jdcbUrl,
        "driver" -> "net.sourceforge.jtds.jdbc.Driver",
        "dbtable" ->
          s"(SELECT COLUMNS WITH INNER JOINS WHERE tbl.ID > ${lastExtractUnixTime}) as t"))
       .load

    jdbcTable.cache()

     println(s"STEP 1] JDBC RECORDS COUNTS ---> ${jdbcTable.count()}")


    val updateJdbcDF = jdbcTable
      .withColumn("ID-COL1", trim($"COl1"))
      .withColumn("ID-COL1", trim($"COl2"))

   println(s"STEP 2] UPDATE JDBC RECORDS COUNTS ---> ${updateJdbcDF.count()}")
   /**
     * MORE DATA PROCESSING HERE
   /**

  jdbcTable.unpersist()

Upvotes: 1

Related Questions