Reputation: 454
I am running a spark job (scala) to convert a column to upper and filter its values
val udfExampleDF = spark.read.parquet("a.parquet")
udfExampleDF.filter(upper(col("device_type"))==="PHONE").select(col("device_type")).show()
I see that this filter is not pushed down. Here's how the physical plan looks like
== Physical Plan ==
CollectLimit 21
+- *(1) Filter (upper(device_type#138) = PHONE)
+- *(1) FileScan parquet [device_type#138] Batched: true, Format: Parquet, Location: InMemoryFileIndex[a.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<orig_device_type:string>
However, if I just filter without the upper, the filter is pushed down.
Any idea why this will be. I assumed these filters will be pushed down in all cases. Thanks for the help.
Upvotes: 1
Views: 776
Reputation: 4024
Since predicate pushdown tries to remove the logical operators and push them to the data source, in our case from Filter
to FileScan parquet
, it must use the original column value.
Theoretically, if other file format supported filtering with altered column value maybe it worked (not sure PushDownPredicate
supports this even if a file format like this existed).
To solve your problem, get around this by making the dynamic values in the different side of the equation, although you have several conditions it will be much faster (try to put the more frequent values in the first conditions):
udfExampleDF.filter(col("device_type")==="Phone" or col("device_type")==="PHONE" or col("device_type")==="phone").select(col("device_type")).show()
You could also unify the column value to always be PHONE
in your data ingestion, i.e before writing the parquet files apply the upper
function, then just filter like this:
udfExampleDF.filter(col("device_type")==="PHONE").select(col("device_type")).show()
Upvotes: 4