Reputation: 6181
I want to run an Athena query. Athena always writes the results to s3. I want all these resulting files to have tags. For example, the tags of the workgroup the query run in.
In that example the results of
client.start_query_execution(
QueryString='some query',
ResultConfiguration={
'OutputLocation': 's3://path/to/query/bucket/'
},
WorkGroup='my_wg'
)
Would be written to s3://path/to/query/bucket/
with the tags of workgroup my_wg
. Is it possible, if so how?
Thanks in advance
Upvotes: 1
Views: 328
Reputation: 38982
S3 events on objects written by Athena don't provide any information that may be useful for tagging through a Lambda function. We can rule that a solution for now.
The other way without writing any code is to set a different path prefix for the output location associated for your different workgroups.
If you want to proceed with manually tagging, QueryExecutionId
is returned in the result.
You can retrieve the object location where the query results are written to and tag them.
import time
import boto3
from urllib.parse import urlparse
workgroup = "my_wg"
workgroup_tag = {"Key": "workgroup", "Value": workgroup}
athena_client = boto3.client("athena", region_name="us-east-1")
response = athena_client.start_query_execution(
QueryString="some query",
ResultConfiguration={
"OutputLocation": "s3://path/to/query/bucket/"
},
WorkGroup=workgroup
)
query_execution_id = response['QueryExecutionId']
state = None
output_location = None
while True:
response = athena_client.get_query_execution(
QueryExecutionId=query_execution_id
)
query_execution = response["QueryExecution"]
state = query_execution["Status"]["State"]
if state in ["QUEUED", "RUNNING"]:
time.sleep(5)
else:
if state == "SUCCEEDED":
output_location = query_execution["ResultConfiguration"]["OutputLocation"]
break
# Now proceed to tagging
s3_client = boto3.client("s3", region_name="us-east-1")
if output_location:
parse_result = urlparse(output_location)
bucket = parse_result.netloc
key = parse_result.path.lstrip('/')
# Add tag to existing tags on the object.
response = s3.get_object_tagging(Bucket=bucket, Key=key)
tag_set = response["TagSet"]
tag_set.append(workgroup_tag)
tagging = {"TagSet": tag_set}
response = s3.put_object_tagging(Bucket=bucket, Key=key, Tagging=tagging)
As a result of this much ado about tagging query results. I recommend configuring different path prefix for the output location associated for your different workgroups. That way you don't have to manually tag them.
Upvotes: 2