Selnay
Selnay

Reputation: 711

Spark is pushing down a filter even when the column is not in the dataframe

I have a DataFrame with the columns:

field1, field1_name, field3, field5, field4, field2, field6

I am selecting it so that I only keep field1, field2, field3, field4. Note that there is no field5 after the select.

After that, I have a filter that uses field5 and I would expect it to throw an analysis error since the column is not there, but instead it is filtering the original DataFrame (before the select) because it is pushing down the filter, as shown here:

== Parsed Logical Plan ==
'Filter ('field5 = 22)
+- Project [field1#43, field2#48, field3#45, field4#47]
+- Relation[field1#43,field1_name#44,field3#45,field5#46,field4#47,field2#48,field6#49] csv

== Analyzed Logical Plan ==
field1: string, field2: string, field3: string, field4: string
Project [field1#43, field2#48, field3#45, field4#47]
+- Filter (field5#46 = 22)
+- Project [field1#43, field2#48, field3#45, field4#47, field5#46]
+- Relation[field1#43,field1_name#44,field3#45,field5#46,field4#47,field2#48,field6#49] csv

== Optimized Logical Plan ==
Project [field1#43, field2#48, field3#45, field4#47]
+- Filter (isnotnull(field5#46) && (field5#46 = 22))
+- Relation[field1#43,field1_name#44,field3#45,field5#46,field4#47,field2#48,field6#49] csv

== Physical Plan ==
  *Project [field1#43, field2#48, field3#45, field4#47]
+- *Filter (isnotnull(field5#46) && (field5#46 = 22))
+- *FileScan csv [field1#43,field3#45,field5#46,field4#47,field2#48] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/..., PartitionFilters: [], PushedFilters: [IsNotNull(field5), EqualTo(field5,22)], ReadSchema: struct<field1:string,field3:string,field5:string,field4:stri...

As you can see the physical plan has the filter before the project... Is this the expected behaviour? I would expect an analysis exception instead...

A reproducible example of the issue:

val df = Seq(
      ("", "", "")
    ).toDF("field1", "field2", "field3")

    val selected = df.select("field1", "field2")
    val shouldFail = selected.filter("field3 == 'dummy'") // I was expecting this filter to fail
    shouldFail.show()

Output:

+------+------+
|field1|field2|
+------+------+
+------+------+

Upvotes: 3

Views: 451

Answers (1)

Michael Heil
Michael Heil

Reputation: 18485

The documentation on the Dataset/Dataframe describes the reason for what you are observing quite well:

"Datasets are "lazy", i.e. computations are only triggered when an action is invoked. Internally, a Dataset represents a logical plan that describes the computation required to produce the data. When an action is invoked, Spark's query optimizer optimizes the logical plan and generates a physical plan for efficient execution in a parallel and distributed manner. "

The important part is highlighted in bold. When applying select and filter statements it just gets added to a logical plan that gets only parsed by Spark when an action is applied. When parsing this full logical plan, the Catalyst Optimizer looks at the whole plan and one of the optimization rules is to push down filters, which is what you see in your example.

I think this is a great feature. Even though you are not interested in seeing this particular field in your final Dataframe, it understands that you are not interested in some of the original data.

That is the main benefit of Spark SQL engine as opposed to RDDs. It understands what you are trying to do without being told how to do it.

Upvotes: 3

Related Questions