PD.
PD.

Reputation: 71

How should I configure Spark to correctly prune Hive Metastore partitions?

I'm having an issues when applying partition filters to Spark (v2.0.2/2.1.1) DataFrames that read from a Hive (v2.1.0) table with over 30,000 partitions. I would like to know what the recommended approach is and what, if anything, I'm doing incorrectly as the current behaviour is a source of large performance an reliability issues.

To enable pruning, I am using the following Spark/Hive property:

--conf spark.sql.hive.metastorePartitionPruning=true

When running a query in spark-shell I can see the partition fetch take place with an invocation to ThriftHiveMetastore.Iface.get_partitions, but this unexpectedly occurs without any filtering:

val myTable = spark.table("db.table")
val myTableData = myTable
  .filter("local_date = '2017-09-01' or local_date = '2017-09-02'")
  .cache

// The HMS call invoked is:
// #get_partitions('db', 'table', -1)

If I use a more simplistic filter, partitions are filtered as desired:

val myTableData = myTable
  .filter("local_date = '2017-09-01'")
  .cache

// The HMS call invoked is:
// #get_partitions_by_filter(
//   'db', 'table',
//   'local_date = "2017-09-01"',
//   -1
// )

The filtering also works correctly if I rewrite the filter to use range operators instead of simply checking for equality:

val myTableData = myTable
  .filter("local_date >= '2017-09-01' and local_date <= '2017-09-02'")
  .cache

// The HMS call invoked is:
// #get_partitions_by_filter(
//   'db', 'table',
//   'local_date >= '2017-09-01' and local_date <= '2017-09-02'',
//   -1
// )

In our case, this behaviour is problematic from a performance perspective; call times are in the region of 4 minutes versus 1 second when correctly filtered. Additionally, routinely loading large volumes ofPartition objects onto the heap per query ultimately leads to memory issues in the metastore service.

It seems as though there is a bug around the parsing and interpretation of certain types of filter constructs, however I've not been able to find a relevant issue in the Spark JIRA. Is there a preferred approach or specific Spark version where filters are correctly applied for all filter variants? Or must I use specific forms (e.g. range operators) when constructing filters? If so, is this limitation documented anywhere?

Upvotes: 3

Views: 3027

Answers (1)

PD.
PD.

Reputation: 71

I have not found a preferred way of querying besides rewriting the filter as described in my (OP) question. I did find that spark has improved support for this and it looks like my case is addressed in Spark 2.3.0. This is the ticket fixing the problem I found: SPARK-20331

Upvotes: 2

Related Questions