Tom Ron
Tom Ron

Reputation: 6181

AWS Athena to write and create tagged objects as s3 output

I want to run an Athena query. Athena always writes the results to s3. I want all these resulting files to have tags. For example, the tags of the workgroup the query run in.

In that example the results of

client.start_query_execution(
    QueryString='some query',
    ResultConfiguration={
        'OutputLocation': 's3://path/to/query/bucket/'
    },
    WorkGroup='my_wg'
)

Would be written to s3://path/to/query/bucket/ with the tags of workgroup my_wg. Is it possible, if so how?

Thanks in advance

Upvotes: 1

Views: 328

Answers (1)

Oluwafemi Sule
Oluwafemi Sule

Reputation: 38982

S3 events on objects written by Athena don't provide any information that may be useful for tagging through a Lambda function. We can rule that a solution for now.

The other way without writing any code is to set a different path prefix for the output location associated for your different workgroups.

If you want to proceed with manually tagging, QueryExecutionId is returned in the result.

You can retrieve the object location where the query results are written to and tag them.

import time
import boto3
from urllib.parse import urlparse


workgroup = "my_wg"
workgroup_tag = {"Key": "workgroup", "Value": workgroup}

athena_client = boto3.client("athena", region_name="us-east-1")
response = athena_client.start_query_execution(
    QueryString="some query",
    ResultConfiguration={
        "OutputLocation": "s3://path/to/query/bucket/"
    },
    WorkGroup=workgroup
)
query_execution_id = response['QueryExecutionId']

state = None
output_location = None

while True:
    response = athena_client.get_query_execution(
        QueryExecutionId=query_execution_id
    )
    query_execution = response["QueryExecution"]
    state = query_execution["Status"]["State"]
    if state in ["QUEUED", "RUNNING"]:
        time.sleep(5)
    else:
        if state == "SUCCEEDED":
            output_location = query_execution["ResultConfiguration"]["OutputLocation"]
        break

# Now proceed to tagging
s3_client = boto3.client("s3", region_name="us-east-1")
if output_location:
    parse_result = urlparse(output_location)
    bucket = parse_result.netloc
    key = parse_result.path.lstrip('/')
    
    # Add tag to existing tags on the object.
    response = s3.get_object_tagging(Bucket=bucket, Key=key)
    tag_set = response["TagSet"]
    tag_set.append(workgroup_tag)
    tagging = {"TagSet": tag_set}

    response = s3.put_object_tagging(Bucket=bucket, Key=key, Tagging=tagging)

As a result of this much ado about tagging query results. I recommend configuring different path prefix for the output location associated for your different workgroups. That way you don't have to manually tag them.

Upvotes: 2

Related Questions