Punter Vicky
Punter Vicky

Reputation: 17012

reduce the amount of data scanned by Athena when using aggregate functions

The below query scans 100 mb of data.

select * from table where column1 = 'val' and partition_id = '20190309';

However the below query scans 15 GB of data (there are over 90 partitions)

select * from table where column1 = 'val' and partition_id in (select max(partition_id) from table);

How can I optimize the second query to scan the same amount of data as the first?

Upvotes: 4

Views: 3897

Answers (3)

Dain Sundstrom
Dain Sundstrom

Reputation: 2858

There are two problems here. The efficiency of the the scalar subquery above select max(partition_id) from table, and the one @PiotrFindeisen pointed out around dynamic filtering.

The the first problem is that queries over the partition keys of a Hive table are a lot more complex than they appear. Most folks would think that if you want the max value of a partition key, you can simply execute a query over the partition keys, but that doesn't work because Hive allows partitions to be empty (and it also allows non-empty files that contain no rows). Specifically, the scalar subquery above select max(partition_id) from table requires Trino (formerly PrestoSQL) to find the max partition containing at least one row. The ideal solution would be to have perfect stats in Hive, but short of that the engine would need to have custom logic for hive that open files of the partitions until it found a non empty one.

If you are are sure that your warehouse does not contain empty partitions (or if you are ok with the implications of that), you can replace the scalar sub query with one over the hidden $partitions table"

select * 
from table 
where column1 = 'val' and 
    partition_id = (select max(partition_id) from "table$partitions");

The second problem is the one @PiotrFindeisen pointed out, and has to do with the way that queries are planned an executed. Most people would look at the above query, see that the engine should obviously figure out the value of select max(partition_id) from "table$partitions" during planning, inline that into the plan, and then continue with optimization. Unfortunately, that is a pretty complex decision to make generically, so the engine instead simply models this as a broadcast join, where one part of the execution figures out that value, and broadcasts the value to the rest of the workers. The problem is the rest of the execution has no way to add this new information into the existing processing, so it simply scans all of the data and then filters out the values you are trying to skip. There is a project in progress to add this dynamic filtering, but it is not complete yet.

This means the best you can do today, is to run two separate queries: one to get the max partition_id and a second one with the inlined value.

BTW, the hidden "$partitions" table was added in Presto 0.199, and we fixed some minor bugs in 0.201. I'm not sure which version Athena is based on, but I believe it is is pretty far out of date (the current release at the time I'm writing this answer is 309.

Upvotes: 10

Tanuj Kumar
Tanuj Kumar

Reputation: 21

I don't know if it is still relevant, but just found out:

Instead of:

select * from table where column1 = 'val' and partition_id in (select max(partition_id) from table);

Use:

select a.* from table a 
inner join (select max(partition_id) max_id from table) b on a.partition_id=b.max_id
where column1 = 'val';

I think it has something to do with optimizations of joins to use partitions.

Upvotes: 2

Andrew
Andrew

Reputation: 1998

EDIT: Presto removed the __internal_partitions__ table in their 0.193 release so I'd suggest not using the solution defined in the Slow aggregation queries for partition keys section below in any production systems since Athena 'transparently' updates presto versions. I ended up just going with the naive SELECT max(partition_date) ... query but also using the same lookback trick outlined in the Lack of Dynamic Filtering section. It's about 3x slower than using the __internal_partitions__ table, but at least it won't break when Athena decides to update their presto version.

----- Original Post -----

So I've come up with a fairly hacky way to accomplish this for date-based partitions on large datasets for when you only need to look back over a few partitions'-worth of data for a match on the max, however, please note that I'm not 100% sure how brittle the usage of the information_schema.__internal_partitions__ table is.

As @Dain noted above, there are really two issues. The first being how slow an aggregation of the max(partition_date) query is, and the second being Presto's lack of support for dynamic filtering.

Slow aggregation queries for partition keys

To solve the first issue, I'm using the information_schema.__internal_partitions__ table which allows me to get quick aggregations on the partitions of a table without scanning the data inside the files. (Note that partition_value, partition_key, and partition_number in the below queries are all column names of the __internal_partitions__ table and not related to your table's columns)

If you only have a single partition key for your table, you can do something like:

SELECT max(partition_value) FROM information_schema.__internal_partitions__
WHERE table_schema = 'DATABASE_NAME' AND table_name = 'TABLE_NAME'

But if you have multiple partition keys, you'll need something more like this:

SELECT max(partition_date) as latest_partition_date from (
  SELECT max(case when partition_key = 'partition_date' then partition_value end) as partition_date, max(case when partition_key = 'another_partition_key' then partition_value end) as another_partition_key
  FROM information_schema.__internal_partitions__
  WHERE table_schema = 'DATABASE_NAME' AND table_name = 'TABLE_NAME'
  GROUP BY partition_number
)
WHERE
  -- ... Filter down by values for e.g. another_partition_key
)

These queries should run fairly quickly (mine run in about 1-2 seconds) without scanning through the actual data in the files, but again, I'm not sure if there are any gotchas with using this approach.

Lack of Dynamic Filtering

I'm able to mitigate the worst effects of the second problem for my specific use-case because I expect there to always be a partition within a finite amount of time back from the current date (e.g. I can guarantee any data-production or partition-loading issues will be remedied within 3 days). It turns out that Athena does do some pre-processing when using presto's datetime functions, so this does not have the same types of issues with Dynamic Filtering as using a sub-query.

So you can change your query to limit how far it will look back for the actual max using the datetime functions so that the amount of data scanned will be limited.

SELECT * FROM "DATABASE_NAME"."TABLE_NAME"
WHERE partition_date >= cast(date '2019-06-25' - interval '3' day as varchar) -- Will only scan partitions from 3 days before '2019-06-25'
AND partition_date = (
  -- Insert the partition aggregation query from above here
)

Upvotes: 2

Related Questions