CUTLER
CUTLER

Reputation: 71

Load data with where clause in spark dataframe

I have a oracle table which has n number of records, now i want to load the data from that table with a where/filter condition to spark dataframe. I Do not want to load complete data to a dataframe and then apply filter on it. Is there any option in spark.read.format("jdbc")...etc or any other solution?

Upvotes: 4

Views: 4131

Answers (4)

You can use following options for this use case . Refer link

    jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

Creating the dataframe based on the query condition :

pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)

Upvotes: 0

Rahul Kumar
Rahul Kumar

Reputation: 2345

Spark does support predicate pushdown for JDBC source.

You can simply load the dataframe using spark.read.format("jdbc") and run filter using .where() on top of that df, you can then check spark SQL predicate pushdown being applied.

In SparkSQL you can see the exact query that ran against the db and you will find the WHERE clause being added.

So you don't need to add anything extra for it.

For more details, refer to this article by databricks https://docs.databricks.com/data/data-sources/sql-databases.html#push-down-optimization

Upvotes: 0

satish koppisetty
satish koppisetty

Reputation: 19

try this

val sourceDf = spark.read.format("jdbc").option("driver", driver).option("url", url).option("dbtable", "(select * from dbo.employee c where c.joindate  > '2018-11-19 00:00:00.000') as subq").option("numPartitions", 6).option("partitionColumn", "depId").option("lowerBound", 1).option("upperBound", 100).option("user", user).option("password", pass).load()

it will enable where condition along with partitions

Upvotes: 1

s.polam
s.polam

Reputation: 10382

Check below code. You can write your own query inside query variable. To process or load data parallel you can check for partitionColumn, lowerBound & upperBound columns.

val query = """
  (select columnA,columnB from table_name
    where <where conditions>) table
"""  
val options = Map(
    "url"              -> "<url>".
    "driver"           -> "<driver class>".
    "user"             -> "<user>".
    "password"         -> "<password>".
    "dbtable"          -> query,
    "partitionColumn"  -> "",
    "lowerBound"       -> "<lower bound values>", 
    "upperBound"       -> "<upper bound values>"
)

val df = spark
        .read
        .format("jdbc")
        .options(options)
        .load()

Upvotes: 2

Related Questions