Reputation: 347
I have a fileA in orc with the following format
key
id_1
id_2
value
value_1
....
value_30
If I use the following config:
'spark.sql.orc.filterPushdown' : true
And my code looks like this:
val filter_A = fileA_DF
.filter(fileA_DF("value.value_1") > lit(some_value))
.select("key.id_")
the size of the file read will be the same as
val filter_A = fileA_DF
.filter(fileA_DF("value.value_1") > lit(some_value))
.select("*")
Shouldn't spark only
I also checked with similar sized avro file and found no improvement in selection speed
Am i measuring orc the wrong way?
Upvotes: 2
Views: 336
Reputation: 5963
If you look at let's take the following reproducible example:
val df = Seq(
((1,1),(2,2)),
((1,1),(2,2)),
((1,1),(2,2))
).toDF("key", "value")
val keySchema = "struct<id_1:int,id_2:int>"
val valueSchema = "struct<value_1:int,value_2:int>"
val input = df.select(col("key").cast(keySchema), col("value").cast(valueSchema))
scala> input.show
+------+------+
| key| value|
+------+------+
|{1, 1}|{2, 2}|
|{1, 1}|{2, 2}|
|{1, 1}|{2, 2}|
+------+------+
input.write.mode("overwrite").orc("myFile.orc")
If we now read this file as with the filters that you apply and use the explain
method, we see the following:
val output = spark.read.orc("myFile.orc")
.filter(col("key.id_1") > lit(1))
.select("key.id_1")
.explain
scala> output.explain
== Physical Plan ==
*(1) Project [key#68.id_1 AS id_1#73]
+- *(1) Filter (isnotnull(value#69) AND (value#69.value_1 > 1))
+- FileScan orc [key#68,value#69] Batched: false, DataFilters: [isnotnull(value#69), (value#69.value_1 > 1)], Format: ORC, Location: InMemoryFileIndex[file:/C:/Users/(C)KurtHoman/myFile.orc], PartitionFilters: [], PushedFilters: [IsNotNull(value), GreaterThan(value.value_1,1)], ReadSchema: struct<key:struct<id_1:int>,value:struct<value_1:int>>
We see that there are some DataFilters
/PushedFilters
at work, so predicate pushdown is working. If you want to really avoid to read full files, you need to make sure your input file is properly partitioned. Some more info about those filters here.
Now, we do see indeed that both the key
and the value
column are being read in, but that is because a PushedFilter
alone does not guarantee that you absolutely don't read in any value where the filter predicate is false, it just applies a prefilter on the file-level (more info in this SO answer). So we will actually have to apply that filter in our Spark DAG as well (which you see in the output of explain
).
So, to wrap it up:
key
and value
columns have to be read in. This is because your filter operation requires the value
column and the final column you're interested in is the key
column and because the PushedFilter
alone does not guarantee your predicate to be true.Upvotes: 2