Reputation: 71
I have a oracle table which has n number of records, now i want to load the data from that table with a where/filter condition to spark dataframe. I Do not want to load complete data to a dataframe and then apply filter on it. Is there any option in spark.read.format("jdbc")...etc or any other solution?
Upvotes: 4
Views: 4131
Reputation: 2334
You can use following options for this use case . Refer link
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
"user" : jdbcUsername,
"password" : jdbcPassword,
"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
Creating the dataframe based on the query condition :
pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)
Upvotes: 0
Reputation: 2345
Spark does support predicate pushdown for JDBC source.
You can simply load the dataframe using spark.read.format("jdbc")
and run filter using .where()
on top of that df, you can then check spark SQL predicate pushdown being applied.
In SparkSQL you can see the exact query that ran against the db and you will find the WHERE clause being added.
So you don't need to add anything extra for it.
For more details, refer to this article by databricks https://docs.databricks.com/data/data-sources/sql-databases.html#push-down-optimization
Upvotes: 0
Reputation: 19
try this
val sourceDf = spark.read.format("jdbc").option("driver", driver).option("url", url).option("dbtable", "(select * from dbo.employee c where c.joindate > '2018-11-19 00:00:00.000') as subq").option("numPartitions", 6).option("partitionColumn", "depId").option("lowerBound", 1).option("upperBound", 100).option("user", user).option("password", pass).load()
it will enable where condition along with partitions
Upvotes: 1
Reputation: 10382
Check below code. You can write your own query inside query variable. To process or load data parallel you can check for partitionColumn, lowerBound & upperBound columns.
val query = """
(select columnA,columnB from table_name
where <where conditions>) table
"""
val options = Map(
"url" -> "<url>".
"driver" -> "<driver class>".
"user" -> "<user>".
"password" -> "<password>".
"dbtable" -> query,
"partitionColumn" -> "",
"lowerBound" -> "<lower bound values>",
"upperBound" -> "<upper bound values>"
)
val df = spark
.read
.format("jdbc")
.options(options)
.load()
Upvotes: 2