Reputation: 1565
When I use JDBC connection to feed spark, even if I use filtering on dataframe; when I inspect query log on my oracle datasource, I am seeing spark executing:
SELECT [column_names] FROM MY_TABLE
Referring to https://stackoverflow.com/a/40870714/1941560,
I was expecting spark lazily plan query and execute like;
SELECT [column_names] FROM MY_TABLE WHERE [filter_predicate]
But spark is not doing that. It takes all the data and filters afterwards. I need this behaviour because I don't want to retrieve all the table every x minutes but only changed rows (incremental filterin by UPDATE_DATE
).
Is there a way to achieve this?
Here is my python code:
df = ...
lookup_seconds = 5 * 60;
now = datetime.datetime.now(pytz.timezone("some timezone"))
max_lookup_datetime = now - datetime.timedelta(seconds=lookup_seconds)
df.where(df.UPDATE_DATE > max_lookup_datetime).explain()
Explain result:
Physical Plan == *Filter (isnotnull(UPDATE_DATE#21) && (UPDATE_DATE#21 > 1516283483208806)) +- Scan ExistingRDD[NO#19,AMOUNT#20,UPDATE_DATE#21,CODE#22,AMOUNT_OLD#23]
EDIT: Complete answer is here
Upvotes: 2
Views: 2111
Reputation: 107
From the official doc1:
dbtable The JDBC table that should be read. Note that anything that is valid in a FROM clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses.
You could set the JDBC option dbtable to a subquery SQL. For example:
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "(select * from tbl where UPDATE_DATE > max_lookup_datetime) t") \
.option("user", "username") \
.option("password", "password") \
.load()
Upvotes: 1
Reputation: 330413
Most likely scenario here is that you cache
input DataFrame
. In that case, Spark won't attempt selection or projection pushdown, and instead will fetch data to the cluster, and process locally.
It easy to illustrate this behavior:
df = spark.read.jdbc(url, table, properties={})
df
DataFrame[id: int, UPDATE_DATE: timestamp]
If data is not cached:
df.select("UPDATE_DATE").where(df.UPDATE_DATE > max_lookup_datetime).explain(True)
== Parsed Logical Plan ==
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
+- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]
== Analyzed Logical Plan ==
UPDATE_DATE: timestamp
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
+- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]
== Optimized Logical Plan ==
Project [UPDATE_DATE#1]
+- Filter (isnotnull(UPDATE_DATE#1) && (UPDATE_DATE#1 > 1516289713075960))
+- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]
== Physical Plan ==
*Scan JDBCRelation(df) [numPartitions=1] [UPDATE_DATE#1] PushedFilters: [*IsNotNull(UPDATE_DATE), *GreaterThan(UPDATE_DATE,2018-01-18 15:35:13.07596)], ReadSchema: struct<UPDATE_DATE:timestamp>
both selection and projection are pushed down. However if you cache
df
, and check execution plan once again:
df.cache()
DataFrame[id: int, UPDATE_DATE: timestamp]
df.select("UPDATE_DATE").where(df.UPDATE_DATE > max_lookup_datetime).explain(True)max_lookup_datetime).explain(True)
== Parsed Logical Plan ==
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
+- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]
== Analyzed Logical Plan ==
UPDATE_DATE: timestamp
Filter (UPDATE_DATE#1 > 1516289713075960)
+- Project [UPDATE_DATE#1]
+- Relation[id#0,UPDATE_DATE#1] JDBCRelation(df) [numPartitions=1]
== Optimized Logical Plan ==
Project [UPDATE_DATE#1]
+- Filter (isnotnull(UPDATE_DATE#1) && (UPDATE_DATE#1 > 1516289713075960))
+- InMemoryRelation [id#0, UPDATE_DATE#1], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Scan JDBCRelation(df) [numPartitions=1] [id#0,UPDATE_DATE#1] ReadSchema: struct<id:int,UPDATE_DATE:timestamp>
== Physical Plan ==
*Filter (isnotnull(UPDATE_DATE#1) && (UPDATE_DATE#1 > 1516289713075960))
+- InMemoryTableScan [UPDATE_DATE#1], [isnotnull(UPDATE_DATE#1), (UPDATE_DATE#1 > 1516289713075960)]
+- InMemoryRelation [id#0, UPDATE_DATE#1], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Scan JDBCRelation(df) [numPartitions=1] [id#0,UPDATE_DATE#1] ReadSchema: struct<id:int,UPDATE_DATE:timestamp>
both projection and selection are delayed.
Upvotes: 2