Reputation: 1655
I am using Spark JDBC to read data from MS SQL database, but I am getting some weird results.
For example, below is my code to read records from my MS SQL database. Please note the tables from which I am reading data is continuously getting inserted with records.
//Extract Data from JDBC source
val jdbcTable = sqlContext.read.format("jdbc").options(
Map(
"url" -> jdcbUrl,
"driver" -> "net.sourceforge.jtds.jdbc.Driver",
"dbtable" ->
s"(SELECT COLUMNS WITH INNER JOINS WHERE tbl.ID > ${lastExtractUnixTime}) as t"))
.load
println(s"STEP 1] JDBC RECORDS COUNTS ---> ${jdbcTable.count()}")
val updateJdbcDF = jdbcTable
.withColumn("ID-COL1", trim($"COl1"))
.withColumn("ID-COL1", trim($"COl2"))
println(s"STEP 2] UPDATE JDBC RECORDS COUNTS ---> ${updateJdbcDF.count()}")
I get 2 different count values each time I run my program, for example I always get ${updateJdbcDF.count()}
count > ${jdbcTable.count()}
.
Can somebody explain me why is this happening? This is creating a lot of issues in my use case. How to restrict count of jdbcTable
DataFrame after it is created. I tried jdbcTable.cache()
but no luck.
The records just gets bigger and bigger when I use any operation on other dataframe derived from jdbcTable
DataFrame. Does jdbcTable
dataframe call every time I use any dataframe derived from jdbcTable
dataframe.
Upvotes: 0
Views: 1643
Reputation: 1655
I was able to fix this issue by applying jdbcTable.cache()
, Now any DF derived from jdbcTable dataframe does not give me a higher count than jdbcTable.count()
. All calculations are OK now. Thanks for the explanation @GPI
//Extract Data from JDBC source
val jdbcTable = sqlContext.read.format("jdbc").options(
Map(
"url" -> jdcbUrl,
"driver" -> "net.sourceforge.jtds.jdbc.Driver",
"dbtable" ->
s"(SELECT COLUMNS WITH INNER JOINS WHERE tbl.ID > ${lastExtractUnixTime}) as t"))
.load
jdbcTable.cache()
println(s"STEP 1] JDBC RECORDS COUNTS ---> ${jdbcTable.count()}")
val updateJdbcDF = jdbcTable
.withColumn("ID-COL1", trim($"COl1"))
.withColumn("ID-COL1", trim($"COl2"))
println(s"STEP 2] UPDATE JDBC RECORDS COUNTS ---> ${updateJdbcDF.count()}")
/**
* MORE DATA PROCESSING HERE
/**
jdbcTable.unpersist()
Upvotes: 1