Reputation: 711
I have a DataFrame
with the columns:
field1, field1_name, field3, field5, field4, field2, field6
I am selecting it so that I only keep field1, field2, field3, field4
. Note that there is no field5
after the select.
After that, I have a filter that uses field5
and I would expect it to throw an analysis error since the column is not there, but instead it is filtering the original DataFrame
(before the select) because it is pushing down the filter, as shown here:
== Parsed Logical Plan ==
'Filter ('field5 = 22)
+- Project [field1#43, field2#48, field3#45, field4#47]
+- Relation[field1#43,field1_name#44,field3#45,field5#46,field4#47,field2#48,field6#49] csv
== Analyzed Logical Plan ==
field1: string, field2: string, field3: string, field4: string
Project [field1#43, field2#48, field3#45, field4#47]
+- Filter (field5#46 = 22)
+- Project [field1#43, field2#48, field3#45, field4#47, field5#46]
+- Relation[field1#43,field1_name#44,field3#45,field5#46,field4#47,field2#48,field6#49] csv
== Optimized Logical Plan ==
Project [field1#43, field2#48, field3#45, field4#47]
+- Filter (isnotnull(field5#46) && (field5#46 = 22))
+- Relation[field1#43,field1_name#44,field3#45,field5#46,field4#47,field2#48,field6#49] csv
== Physical Plan ==
*Project [field1#43, field2#48, field3#45, field4#47]
+- *Filter (isnotnull(field5#46) && (field5#46 = 22))
+- *FileScan csv [field1#43,field3#45,field5#46,field4#47,field2#48] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/..., PartitionFilters: [], PushedFilters: [IsNotNull(field5), EqualTo(field5,22)], ReadSchema: struct<field1:string,field3:string,field5:string,field4:stri...
As you can see the physical plan has the filter before the project... Is this the expected behaviour? I would expect an analysis exception instead...
A reproducible example of the issue:
val df = Seq(
("", "", "")
).toDF("field1", "field2", "field3")
val selected = df.select("field1", "field2")
val shouldFail = selected.filter("field3 == 'dummy'") // I was expecting this filter to fail
shouldFail.show()
Output:
+------+------+
|field1|field2|
+------+------+
+------+------+
Upvotes: 3
Views: 451
Reputation: 18485
The documentation on the Dataset/Dataframe describes the reason for what you are observing quite well:
"Datasets are "lazy", i.e. computations are only triggered when an action is invoked. Internally, a Dataset represents a logical plan that describes the computation required to produce the data. When an action is invoked, Spark's query optimizer optimizes the logical plan and generates a physical plan for efficient execution in a parallel and distributed manner. "
The important part is highlighted in bold. When applying select
and filter
statements it just gets added to a logical plan that gets only parsed by Spark when an action is applied. When parsing this full logical plan, the Catalyst Optimizer looks at the whole plan and one of the optimization rules is to push down filters, which is what you see in your example.
I think this is a great feature. Even though you are not interested in seeing this particular field in your final Dataframe, it understands that you are not interested in some of the original data.
That is the main benefit of Spark SQL engine as opposed to RDDs. It understands what you are trying to do without being told how to do it.
Upvotes: 3