olaf
olaf

Reputation: 347

Reading orc does not trigger projection pushdown and predicate push down

I have a fileA in orc with the following format

key
    id_1
    id_2
value
    value_1
     ....
    value_30

If I use the following config:

'spark.sql.orc.filterPushdown'                : true

And my code looks like this:

val filter_A = fileA_DF
  .filter(fileA_DF("value.value_1") > lit(some_value))
  .select("key.id_")

the size of the file read will be the same as

val filter_A = fileA_DF
  .filter(fileA_DF("value.value_1") > lit(some_value))
  .select("*")

Shouldn't spark only

  1. predicate pushdown - read files and stripes that satisfy the filter
  2. projection pushdown - read columns that we are used?

I also checked with similar sized avro file and found no improvement in selection speed

Am i measuring orc the wrong way?

Upvotes: 2

Views: 336

Answers (1)

Koedlt
Koedlt

Reputation: 5963

If you look at let's take the following reproducible example:

val df = Seq(
  ((1,1),(2,2)),
  ((1,1),(2,2)),
  ((1,1),(2,2))
).toDF("key", "value")

val keySchema = "struct<id_1:int,id_2:int>"
val valueSchema = "struct<value_1:int,value_2:int>"

val input = df.select(col("key").cast(keySchema), col("value").cast(valueSchema))

scala> input.show
+------+------+
|   key| value|
+------+------+
|{1, 1}|{2, 2}|
|{1, 1}|{2, 2}|
|{1, 1}|{2, 2}|
+------+------+

input.write.mode("overwrite").orc("myFile.orc")

If we now read this file as with the filters that you apply and use the explain method, we see the following:

val output = spark.read.orc("myFile.orc")
  .filter(col("key.id_1") > lit(1))
  .select("key.id_1")
  .explain

scala> output.explain
== Physical Plan ==
*(1) Project [key#68.id_1 AS id_1#73]
+- *(1) Filter (isnotnull(value#69) AND (value#69.value_1 > 1))
   +- FileScan orc [key#68,value#69] Batched: false, DataFilters: [isnotnull(value#69), (value#69.value_1 > 1)], Format: ORC, Location: InMemoryFileIndex[file:/C:/Users/(C)KurtHoman/myFile.orc], PartitionFilters: [], PushedFilters: [IsNotNull(value), GreaterThan(value.value_1,1)], ReadSchema: struct<key:struct<id_1:int>,value:struct<value_1:int>>

We see that there are some DataFilters/PushedFilters at work, so predicate pushdown is working. If you want to really avoid to read full files, you need to make sure your input file is properly partitioned. Some more info about those filters here.

Now, we do see indeed that both the key and the value column are being read in, but that is because a PushedFilter alone does not guarantee that you absolutely don't read in any value where the filter predicate is false, it just applies a prefilter on the file-level (more info in this SO answer). So we will actually have to apply that filter in our Spark DAG as well (which you see in the output of explain).

So, to wrap it up:

  • Your filter predicates are being pushed down. If you want full files not to be read, make sure to partition your file accordingly before reading it in.
  • In this case, both your key and value columns have to be read in. This is because your filter operation requires the value column and the final column you're interested in is the key column and because the PushedFilter alone does not guarantee your predicate to be true.

Upvotes: 2

Related Questions