jlaralopez
jlaralopez

Reputation: 35

How to pushdown partitioning when reading from jdbc Spark

i want to read from a table via JDBC connection, using both filtering with a where clause, and partitioning on another column with the options partitionColumn, lowerBound, upperBound, numPartitions.

Currently both filtering and partitioning is working, however i notice that the partitioning is not being pushdown. so i get a SELECT * FROM mytable as a CTE, and then that CTE gets the partioning, resulting in 200 queries like (200 given by numPartitions):

SELECT * FROM (SELECT * FROM TABLE WHERE =filter)TBL WHERE PARTITION_COLUMN>value in LOWERBOUND AND PARTITION_COLUMN<value in UPPER_BOUND;

Instead i would like to have 200 queries (200 given by numPartitions) with:

SELECT * FROM (SELECT * FROM TABLE WHERE =filter AND PARTITION_COLUMN>value in LOWERBOUND AND PARTITION_COLUMN<value in UPPER_BOUND);

Is there a way to pushdown the partitioning to the CTE? the idea is to avoid two times the SELECT *, as i'm using a big table and having that query 200 times is slow. This is my code:

#Gather the value for lowerBound
query_min = ''' (SELECT 
    MIN(PARTITION_COLUMN) AS MIN_VALUE
    FROM SCHEMA.TABLE )TBL '''

min_df = sparkSession.read \
    .format("jdbc") \
    .option("dbtable", query_min) \
    .load()
minval=min_df.head(1)[0][0]


#Gather the value for upperBound
query_max = ''' (SELECT 
    MAX(PARTITION_COLUMN) AS MAX_VALUE
    FROM SCHEMA.TABLE )TBL '''

max_df = sparkSession.read \
    .format("jdbc") \
    .option("dbtable", query_max) \
    .load()
maxval=max_df.head(1)[0][0]

#Read the table filtering with FILTER_COLUMN, and partitioning with PARTITION_COLUMN
query = ''' (SELECT * FROM TABLE WHERE FILTER_COLUMN=VALUE
FROM SCHEMA.TABLE )TBL '''

df = spark.read.format("jdbc")\
                    .option("dbtable", query)\
                    .option("partitionColumn", "PARTITION_COLUMN")\
                    .option("numPartitions", "200")\
                    .option("lowerBound", minval)\
                    .option("upperBound", maxval)\
                    .load()

Upvotes: 0

Views: 172

Answers (1)

Kombajn zbożowy
Kombajn zbożowy

Reputation: 10693

Nothing to do here. Both queries - the one that is run and the one you would like to have - are equivalent. Predicate pushdown happens under the hood and any DBMS optimizer will be smart enough to just run a single scan through the table with all filters applied, regardless of whether they are in the inner or outer query.

You can ask database to explain query plan to prove it. Try this in your db or SQL Fiddle (here example is for Postgres):

create table foo (a int, b int);

insert into foo
  select generate_series, generate_series * 2
  from generate_series(1, 10000);

explain
  select * from foo where b % 2 = 0 and a > 1 and a < 1000;

explain
  select * from (select * from foo where b % 2 = 0) x where a > 1 and a < 1000;

explain
  select * from (select * from foo) x where b % 2 = 0 and a > 1 and a < 1000;

explain
  select * from (select * from (select * from foo where a > 1) x where a < 1000) x where b % 2 = 0;

The output of each of explain statements is:

Seq Scan on foo (cost=0.00..248.40 rows=1 width=8)
Filter: ((a > 1) AND (a < 1000) AND ((b % 2) = 0))

Upvotes: 0

Related Questions