H.Aadhithya
H.Aadhithya

Reputation: 205

Spark Predicate pushdown not working on date

I am just reading a parquet file and adding a filter to match all records that falls in date - here 2021-04-03. The column shoudn't be null and it should be on the date given.

Input table

+---------+-----------+-------------------+
|      lat|        lng|       eventDTLocal|
+---------+-----------+-------------------+
|34.269788| -98.239543|2021-04-03 19:18:58|
|29.780977| -95.749744|2021-04-03 19:33:24|
|48.150173|-122.191903|2021-04-03 17:25:00|
|40.652889| -74.185461|2021-04-03 20:27:55|
|41.747148| -87.799557|2021-04-03 19:52:39|
+---------+-----------+-------------------+

I have tried casting the column to date, used substring_index function for matching but I am just not able to get it in the pushed filters.

Following are the code I tried :

df1 = spark.read.parquet("/Users/aadhithyahari/Downloads/awsfiles/part-00000-bfccec4c-7939-4f85-8fa9-5f1cb34f843a.c000.snappy.parquet") \
        .select( 'lat', 'lng', 'eventDTLocal').filter("TO_DATE(CAST(UNIX_TIMESTAMP(`eventDTLocal`, 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP),'yyyy-MM-dd') == CAST('2021-04-03' AS DATE)").explain(extended=True)

The filters are listed only In Data Filters and not anywhere else. What am I missing here ?

Upvotes: 3

Views: 2172

Answers (1)

werner
werner

Reputation: 14845

Not all filters can be pushed down. In general most filters that contain function calls like substring or unix_timestamp cannot be pushed down. The complete logic which filters will be pushed down is implemented in DataSourceStrategy.

A way to work around this limitation in this case would be to store the values of eventDTLocal as unix timestamps instead of strings in the parquet file and then filter on the specific milliseconds.

#create some test data
data = [(52.5151923, 13.3824107, 1618760421000), 
        (1.0, 1.0, 1)]
spark.createDataFrame(data, schema=['lat', 'lng', 'eventDTLocal']) \
    .write.mode("overwrite").parquet("dataWithUnixTime")

#get the first and last millisecond of the day
#the timezone has probably to be adjusted
from datetime import datetime, timezone
dt = datetime(2021, 4, 18)
start = dt.replace(tzinfo=timezone.utc).timestamp() * 1000
end = start + 24 * 60 * 60 * 1000 - 1

#run the query
df = spark.read.parquet("dataWithUnixTime") \
    .filter(f"eventDTLocal >= {start} and eventDTLocal <= {end}")

The physical plan of df


== Physical Plan ==
*(1) Project [lat#9, lng#10, eventDTLocal#11L]
+- *(1) Filter ((isnotnull(eventDTLocal#11L) AND (eventDTLocal#11L >= 1618704000000)) AND (eventDTLocal#11L <= 1618790399999))
   +- *(1) ColumnarToRow
      +- FileScan parquet [lat#9,lng#10,eventDTLocal#11L] Batched: true, DataFilters: [isnotnull(eventDTLocal#11L), (eventDTLocal#11L >= 1618704000000), (eventDTLocal#11L <= 161879039..., Format: Parquet, Location: InMemoryFileIndex[file:/home/werner/Java/pyspark3/dataWithUnixTime], PartitionFilters: [], PushedFilters: [IsNotNull(eventDTLocal), GreaterThanOrEqual(eventDTLocal,1618704000000), LessThanOrEqual(eventDT..., ReadSchema: struct<lat:double,lng:double,eventDTLocal:bigint>

now contains the pushed filters GreaterThanOrEqual and LessThanOrEqual for the date column.

Upvotes: 3

Related Questions