Reputation: 1055
I have a dataset in s3 and am querying via Athena. A new version of partitions within the dataset is written every x minutes (Boto3/Lambda). Ultimately I need to efficiently query the 'latest' version of the data via Athena SQL.
Strategy so far, which is working, but has 'issues' ....
Now all this works, but I am concerned about the switch over time. Using Workbench/J (may be less when in Lambda, but same principal)
So that is at least 2 seconds where my data set will produce incorrect results. Or if I do the 'current=No' first, and then add the new partition, 2 seconds where there will be no current data.
Doesn't sound a lot, but
So the question - is there a better way of achieving this ? The ability to select the 'latest' version of data, without complex SQL to determin max(stamp) for each partition in a subquery.
-- first partition (earlier date)
ALTER TABLE mydb.getresources
ADD PARTITION (ac='123456789012', `current` = 'Yes', stamp='2020-09-25T07_44_50.301984', region='us-east-1')
LOCATION 's3://mybucket/api/resourcegroupstaggingapi/getresources/ac=123456789012/stamp=2020-09-25T07_44_50.301984/region=us-east-1/'
-- runtime 2.26s
-- second partiton (later date)
ALTER TABLE mydb.getresources
ADD PARTITION (ac='123456789012', `current` = 'Yes', stamp='2020-09-25T08_02_50.925047', region='us-east-1')
LOCATION 's3://mybucket/api/resourcegroupstaggingapi/getresources/ac=123456789012/stamp=2020-09-25T08_02_50.925047/region=us-east-1/'
-- runtime 1.18s
-- rename old
ALTER TABLE mydb.getresources
PARTITION (ac='123456789012', `current` = 'Yes', stamp='2020-09-25T07_44_50.301984', region='us-east-1')
RENAME TO PARTITION (ac='123456789012', `current` = 'No', stamp='2020-09-25T07_44_50.301984', region='us-east-1');
-- runtime 1.51s
Upvotes: 2
Views: 2641
Reputation: 388
These solutions are really awesome folks ! I would suggest also a more impacting approach as you would be adopting a Storage maintenance library as your new merging engine but really useful to take a look at things like Delta Lake and Hudi as the versioning and point in time querying is all sorted out of the box.
Upvotes: 0
Reputation: 876
You can try the following workaround. The following approach is suggested assuming you dont update partitions very frequently, may be nothing less than once in 5 mins (debatable)
Don't update the partitions on the existing table.
Lets say you have a table A1. with partitions x0, x1, x2, x3 ... xn.
First thing instead of querying from the table directly, query from a view lets say view_A and its definitiion is as simple as
CREATE OR REPLACE VIEW view_A AS SELECT * FROM db.A1
Now, if you need to add a new partition, delete an existing or both, do as follow
create a new empty table A2 which has the same schema as A1 using a CTAS query . So A2 will have some {xi} partitions from A1 and new y1,y2...ym paritions to be added. Add the list of partition to A2 by ALTER statements (All the paritions that you need from A1 and any new set of partitions that needs to be added) or as @Theo mentioned, use Glue api for BatchPartitionUpdate.
Now update your view definition for view_A to point to the new table
CREATE OR REPLACE VIEW view_A AS SELECT * FROM db.A2
I don't see any issue now when you query from the view. Once A2 is ready, you switch the view to point to A2.
You can drop the old tables (like A1) as Athena does not delete the underlying data once the view is updated.
Note: None of this should incur any additional cost since they are all DDL commands and won't scan through any data.
Upvotes: 1
Reputation: 132862
There is unfortunately no atomic operations in the Glue Data Catalog API, which Athena uses for storing metadata about tables and partitions. There is no way to modify multiple partitions at the same time in a transactional way.
However, there are things you can do to shorten the duration where you may have inconsistencies: use the Glue Data Catalog API directly instead of Athena's SQL interface. Going through Athena is much slower than using the API directly.
Using the CreatePartition
API call you can add the new partition, and with UpdatePartition
you can modify the previous partition – this corresponds to what you currently do, but will leave a shorter duration where there is two partitions marked as current.
You can do one better by using BatchUpdatePartition
: by adding the new partition with the current flag set to false and then batch update the current and previous partitions to swap the flag you can get the duration down to as short as it probably can be – even though there is no guarantee that a query will not see two current partitions or no current partitions as far as I understand it, the API is not guaranteed to be atomic.
However, it's a bit of a hack to use a partition key like this to mark the most recent partition. Using microsecond resolution timestamps as a partition keys also makes me question what it is you are trying to achieve. Athena is not a low latency database, and it generally performs really poorly with many partitions with small files. Using it to find the last written file is never going to be great.
I suspect that each of your partitions contains a single file, and if that is the case wouldn't it be easier and more performant to just to an S3 listing and either grab the object or do an S3 select?
If that is not an option, could the process that runs the query do an API call before the query? In that case it could look up the most recent partition in the Glue Data Catalog API, or list S3, or you could write the latest partition into Parameter Store and read it from there. I think there are a lot of ways this can be improved but I don't have the information to help you.
If you describe in more detail what you are trying to achieve maybe we can help you find a solution that will work better.
Upvotes: 1