Nitin Kumar
Nitin Kumar

Reputation: 249

Spark Predicate Pushdown Not Working As Expected

I am having problem with Spark's predicate pushdown behaviour. Something seems to be wrong. I am using Spark version 2.4.5 on MacOS

Below is my sample csv data results2.csv

enter image description here

val df = spark.read.option("header", "true").csv("/Users/apple/kaggle-data/results2.csv")

partition on 2 columns : country and city

df.repartition($"country",$"city").write.option("header", "true").partitionBy("country","city").parquet("/Users/apple/kaggle-data/part2/")

partition on 1 columns: country

val df2 = spark.read.option("header", "true").csv("/Users/apple/kaggle-data/results2.csv")
df2.repartition($"country").write.option("header", "true").partitionBy("country").parquet("/Users/apple/kaggle-data/part1/")

I read data with partition only on country and query on predicate country and city , but the pushdown filter shows city which is not expected, i was expecting country to be here

val kaggleDf1 = spark.read.option("header", "true").parquet("/Users/apple/kaggle-data/part1/") 
kaggleDf1.where($"country" === "England" && $"city" === "London").explain(true)

the plan

== Parsed Logical Plan ==
'Filter (('country = England) && ('city = London))
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet

== Analyzed Logical Plan ==
date: string, home_team: string, away_team: string, home_score: string, away_score: string, tournament: string, city: string, neutral: string, country: string
Filter ((country#146 = England) && (city#144 = London))
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet

== Optimized Logical Plan ==
Filter (((isnotnull(country#146) && isnotnull(city#144)) && (country#146 = England)) && (city#144 = London))
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet

== Physical Plan ==
*(1) Project [date#138, home_team#139, away_team#140, home_score#141, away_score#142, tournament#143, city#144, neutral#145, country#146]
+- *(1) Filter (isnotnull(city#144) && (city#144 = London))
   +- *(1) FileScan parquet [date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] Batched: true, Format: Parquet, Location: InMemoryFileIndex[/Users/apple/kaggle-data/part1], PartitionCount: 1, PartitionFilters: [isnotnull(country#146), (country#146 = England)], ***PushedFilters: [IsNotNull(city), EqualTo(city,London)]***, ReadSchema: struct<date:string,home_team:string,away_team:string,home_score:string,away_score:string,tourname...

I read data with partition only on country and query on predicate country , but the pushdown filter shows empty which is not expected, i was expecting country to be here

kaggleDf1.where($"country" === "England").explain(true)

the plan:

== Parsed Logical Plan ==
'Filter ('country = England)
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet

== Analyzed Logical Plan ==
date: string, home_team: string, away_team: string, home_score: string, away_score: string, tournament: string, city: string, neutral: string, country: string
Filter (country#146 = England)
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet

== Optimized Logical Plan ==
Filter (isnotnull(country#146) && (country#146 = England))
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet

== Physical Plan ==
*(1) FileScan parquet [date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] Batched: true, Format: Parquet, Location: InMemoryFileIndex[/Users/apple/kaggle-data/part1], PartitionCount: 1, PartitionFilters: [isnotnull(country#146), (country#146 = England)], ***PushedFilters: []***, ReadSchema: struct<date:string,home_team:string,away_team:string,home_score:string,away_score:string,tourname...

I read data with partition on country and city and query on predicate country and city , but the pushdown filter shows empty which is not expected, i was expecting country and city to be here

val kaggleDf2 = spark.read.option("header", "true").parquet("/Users/apple/kaggle-data/part2/")
kaggleDf2.where($"country" === "England" && $"city" === "London").explain(true)

the plan:

== Parsed Logical Plan ==
'Filter (('country = England) && ('city = London))
+- Relation[date#158,home_team#159,away_team#160,home_score#161,away_score#162,tournament#163,neutral#164,country#165,city#166] parquet

== Analyzed Logical Plan ==
date: string, home_team: string, away_team: string, home_score: string, away_score: string, tournament: string, neutral: string, country: string, city: string
Filter ((country#165 = England) && (city#166 = London))
+- Relation[date#158,home_team#159,away_team#160,home_score#161,away_score#162,tournament#163,neutral#164,country#165,city#166] parquet

== Optimized Logical Plan ==
Filter (((isnotnull(country#165) && isnotnull(city#166)) && (country#165 = England)) && (city#166 = London))
+- Relation[date#158,home_team#159,away_team#160,home_score#161,away_score#162,tournament#163,neutral#164,country#165,city#166] parquet

== Physical Plan ==
*(1) FileScan parquet [date#158,home_team#159,away_team#160,home_score#161,away_score#162,tournament#163,neutral#164,country#165,city#166] Batched: true, Format: Parquet, Location: InMemoryFileIndex[/Users/apple/kaggle-data/part2], PartitionCount: 1, PartitionFilters: [isnotnull(country#165), isnotnull(city#166), (country#165 = England), (city#166 = London)], ***PushedFilters: []***, ReadSchema: struct<date:string,home_team:string,away_team:string,home_score:string,away_score:string,tourname...

Can anyone help me what's wrong here. Am I missing something ?

Upvotes: 4

Views: 1798

Answers (2)

Belwal
Belwal

Reputation: 473

It is because of the PartitionFilters and the behaviour is expected.

When the data in parquet file is saved using partition by and if a query matches certain partition filter criteria, Spark reads only those sub-directories that match the partition filters, hence it doesn't need to apply that filter on data again so there won't be any filter on these columns at all.

Now in your case:

kaggleDf1.where($"country" === "England" && $"city" === "London")
PartitionFilters: [isnotnull(country#146), (country#146 = England)]
PushedFilters: [IsNotNull(city), EqualTo(city,London)]

Spark is reading only those files which contains country === "England" (because your data was partitioned by country during save), so it doesn't need to apply that filter on data again. And you won't find this filter anywhere except PartitionFilters.

Upvotes: 2

Ged
Ged

Reputation: 18003

I think you are misinterpreting.

I read data with partition only on country and query on predicate country and city , but the pushdown filter shows city which is not expected, i was expecting country to be here.

There is a partition filter for partition pruning and push down means the filters are pushed to the source as opposed to being brought into Spark - although you can disable that. This is for performance reasons.

Push down has 2 aspects. Partition Filter allows only those partitions to be read, this saving on scanning, and then within that partition or partitions, the filter of city is subsequently applied. PARQUET is columnar as well.

...PartitionFilters: [isnotnull(country#146), (country#146 = England)], ***PushedFilters: [IsNotNull(city), EqualTo(city,London)]***...

So, no issues, the expectation needs to be aligned, that's all. The second case you should be able to work out now.

Upvotes: 1

Related Questions