CSUNNY
CSUNNY

Reputation: 454

Spark: Filters on Functions are not pushed down

I am running a spark job (scala) to convert a column to upper and filter its values

val udfExampleDF = spark.read.parquet("a.parquet")
udfExampleDF.filter(upper(col("device_type"))==="PHONE").select(col("device_type")).show()

I see that this filter is not pushed down. Here's how the physical plan looks like

== Physical Plan ==
CollectLimit 21
+- *(1) Filter (upper(device_type#138) = PHONE)
   +- *(1) FileScan parquet [device_type#138] Batched: true, Format: Parquet, Location: InMemoryFileIndex[a.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<orig_device_type:string>

However, if I just filter without the upper, the filter is pushed down.

Any idea why this will be. I assumed these filters will be pushed down in all cases. Thanks for the help.

Upvotes: 1

Views: 776

Answers (1)

Ofek Hod
Ofek Hod

Reputation: 4024

Since predicate pushdown tries to remove the logical operators and push them to the data source, in our case from Filter to FileScan parquet, it must use the original column value.

Theoretically, if other file format supported filtering with altered column value maybe it worked (not sure PushDownPredicate supports this even if a file format like this existed).

To solve your problem, get around this by making the dynamic values in the different side of the equation, although you have several conditions it will be much faster (try to put the more frequent values in the first conditions):

udfExampleDF.filter(col("device_type")==="Phone" or col("device_type")==="PHONE" or col("device_type")==="phone").select(col("device_type")).show()

You could also unify the column value to always be PHONE in your data ingestion, i.e before writing the parquet files apply the upper function, then just filter like this:

udfExampleDF.filter(col("device_type")==="PHONE").select(col("device_type")).show()

Upvotes: 4

Related Questions