Tomas Bartalos
Tomas Bartalos

Reputation: 1276

Efficient reading nested parquet column in Spark

I have following (simplified) schema:

root
 |-- event: struct (nullable = true)
 |    |-- spent: struct (nullable = true)
 |    |    |-- amount: decimal(34,3) (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |
 |    | ... ~ 20 other struct fields on "event" level

I'm trying to sum on nested field

spark.sql("select sum(event.spent.amount) from event")

According to spark metrics I'm reading 18 GB from disk and it takes 2.5 min.

However when I select the top level field:

 spark.sql("select sum(amount) from event")

I read only 2GB in 4 seconds.

From the physical plan I can see that in case of nested structure the whole event struct with all fields are read from parquet, which is a waste.

Parquet format should be able to provide the desired column from nested structure without reading it all (which is the point of columnar store). Is there some way to do this efficiently in Spark ?

Upvotes: 9

Views: 4257

Answers (1)

Tomas Bartalos
Tomas Bartalos

Reputation: 1276

Solution:

spark.sql("set spark.sql.optimizer.nestedSchemaPruning.enabled=true")
spark.sql("select sum(amount) from (select event.spent.amount as amount from event_archive)")

The query must be written in sub-select fashion. You can't wrap the selected column in aggregate function. Following query will break schema pruning:

select sum(event.spent.amount) as amount from event

Whole schema pruning work is covered in SPARK-4502

Dirty workaround can be also specifying "projected schema" at load time:

val DecimalType = DataTypes.createDecimalType(18, 4)
val schema = StructType(StructField("event", StructType(
      StructField("spent", StructType(
          StructField("amount", DecimalType, true) :: Nil
      ), true) :: Nil
    ), true) :: Nil
  )
 val df = spark.read.format("parquet").schema(schema).load(<path>)

Upvotes: 7

Related Questions